Line data Source code
1 : /* Copyright (c) 2001, Matej Pfajfar.
2 : * Copyright (c) 2001-2004, Roger Dingledine.
3 : * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
4 : * Copyright (c) 2007-2021, The Tor Project, Inc. */
5 : /* See LICENSE for licensing information */
6 :
7 : /**
8 : * @file mainloop_pubsub.c
9 : * @brief Connect the publish-subscribe code to the main-loop.
10 : *
11 : * This module is responsible for instantiating all the channels used by the
12 : * publish-subscribe code, and making sure that each one's messages are
13 : * processed when appropriate.
14 : **/
15 :
16 : #include "orconfig.h"
17 :
18 : #include "core/or/or.h"
19 : #include "core/mainloop/mainloop.h"
20 : #include "core/mainloop/mainloop_pubsub.h"
21 :
22 : #include "lib/container/smartlist.h"
23 : #include "lib/dispatch/dispatch.h"
24 : #include "lib/dispatch/dispatch_naming.h"
25 : #include "lib/evloop/compat_libevent.h"
26 : #include "lib/pubsub/pubsub.h"
27 : #include "lib/pubsub/pubsub_build.h"
28 :
29 : /**
30 : * Dispatcher to use for delivering messages.
31 : **/
32 : static dispatch_t *the_dispatcher = NULL;
33 : static pubsub_items_t *the_pubsub_items = NULL;
34 : /**
35 : * A list of mainloop_event_t, indexed by channel ID, to flush the messages
36 : * on a channel.
37 : **/
38 : static smartlist_t *alert_events = NULL;
39 :
40 : /**
41 : * Mainloop event callback: flush all the messages in a channel.
42 : *
43 : * The channel is encoded as a pointer, and passed via arg.
44 : **/
45 : static void
46 0 : flush_channel_event(mainloop_event_t *ev, void *arg)
47 : {
48 0 : (void)ev;
49 0 : if (!the_dispatcher)
50 : return;
51 :
52 0 : channel_id_t chan = (channel_id_t)(uintptr_t)(arg);
53 0 : dispatch_flush(the_dispatcher, chan, INT_MAX);
54 : }
55 :
56 : /**
57 : * Construct our global pubsub object from <b>builder</b>. Return 0 on
58 : * success, -1 on failure. */
59 : int
60 235 : tor_mainloop_connect_pubsub(struct pubsub_builder_t *builder)
61 : {
62 235 : int rv = -1;
63 235 : tor_mainloop_disconnect_pubsub();
64 :
65 235 : the_dispatcher = pubsub_builder_finalize(builder, &the_pubsub_items);
66 235 : if (! the_dispatcher)
67 0 : goto err;
68 :
69 235 : rv = 0;
70 235 : goto done;
71 0 : err:
72 0 : tor_mainloop_disconnect_pubsub();
73 235 : done:
74 235 : return rv;
75 : }
76 :
77 : /**
78 : * Install libevent events for all of the pubsub channels.
79 : *
80 : * Invoke this after tor_mainloop_connect_pubsub, and after libevent has been
81 : * initialized.
82 : */
83 : void
84 0 : tor_mainloop_connect_pubsub_events(void)
85 : {
86 0 : tor_assert(the_dispatcher);
87 0 : tor_assert(! alert_events);
88 :
89 0 : const size_t num_channels = get_num_channel_ids();
90 0 : alert_events = smartlist_new();
91 0 : for (size_t i = 0; i < num_channels; ++i) {
92 0 : smartlist_add(alert_events,
93 0 : mainloop_event_postloop_new(flush_channel_event,
94 : (void*)(uintptr_t)(i)));
95 : }
96 0 : }
97 :
98 : /**
99 : * Dispatch alertfn callback: do nothing. Implements DELIV_NEVER.
100 : **/
101 : static void
102 0 : alertfn_never(dispatch_t *d, channel_id_t chan, void *arg)
103 : {
104 0 : (void)d;
105 0 : (void)chan;
106 0 : (void)arg;
107 0 : }
108 :
109 : /**
110 : * Dispatch alertfn callback: activate a mainloop event. Implements
111 : * DELIV_PROMPT.
112 : **/
113 : static void
114 0 : alertfn_prompt(dispatch_t *d, channel_id_t chan, void *arg)
115 : {
116 0 : (void)d;
117 0 : (void)chan;
118 0 : mainloop_event_t *event = arg;
119 0 : mainloop_event_activate(event);
120 0 : }
121 :
122 : /**
123 : * Dispatch alertfn callback: flush all messages right now. Implements
124 : * DELIV_IMMEDIATE.
125 : **/
126 : static void
127 0 : alertfn_immediate(dispatch_t *d, channel_id_t chan, void *arg)
128 : {
129 0 : (void) arg;
130 0 : dispatch_flush(d, chan, INT_MAX);
131 0 : }
132 :
133 : /**
134 : * Set the strategy to be used for delivering messages on the named channel.
135 : *
136 : * This function needs to be called once globally for each channel, to
137 : * set up how messages are delivered.
138 : **/
139 : int
140 0 : tor_mainloop_set_delivery_strategy(const char *msg_channel_name,
141 : deliv_strategy_t strategy)
142 : {
143 0 : channel_id_t chan = get_channel_id(msg_channel_name);
144 0 : if (BUG(chan == ERROR_ID) ||
145 0 : BUG(chan >= smartlist_len(alert_events)))
146 0 : return -1;
147 :
148 0 : switch (strategy) {
149 0 : case DELIV_NEVER:
150 0 : dispatch_set_alert_fn(the_dispatcher, chan, alertfn_never, NULL);
151 0 : break;
152 0 : case DELIV_PROMPT:
153 0 : dispatch_set_alert_fn(the_dispatcher, chan, alertfn_prompt,
154 0 : smartlist_get(alert_events, chan));
155 0 : break;
156 0 : case DELIV_IMMEDIATE:
157 0 : dispatch_set_alert_fn(the_dispatcher, chan, alertfn_immediate, NULL);
158 0 : break;
159 : }
160 : return 0;
161 : }
162 :
163 : /**
164 : * Remove all pubsub dispatchers and events from the mainloop.
165 : **/
166 : void
167 470 : tor_mainloop_disconnect_pubsub(void)
168 : {
169 470 : if (the_pubsub_items) {
170 235 : pubsub_items_clear_bindings(the_pubsub_items);
171 235 : pubsub_items_free(the_pubsub_items);
172 : }
173 470 : if (alert_events) {
174 0 : SMARTLIST_FOREACH(alert_events, mainloop_event_t *, ev,
175 : mainloop_event_free(ev));
176 0 : smartlist_free(alert_events);
177 : }
178 470 : dispatch_free(the_dispatcher);
179 470 : }
|