Line data Source code
1 : /* Copyright (c) 2001, Matej Pfajfar.
2 : * Copyright (c) 2001-2004, Roger Dingledine.
3 : * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
4 : * Copyright (c) 2007-2021, The Tor Project, Inc. */
5 : /* See LICENSE for licensing information */
6 :
7 : /**
8 : * @file pubsub_build.c
9 : * @brief Construct a dispatch_t in safer, more OO way.
10 : **/
11 :
12 : #define PUBSUB_PRIVATE
13 :
14 : #include "lib/dispatch/dispatch.h"
15 : #include "lib/dispatch/dispatch_cfg.h"
16 : #include "lib/dispatch/dispatch_naming.h"
17 : #include "lib/dispatch/msgtypes.h"
18 : #include "lib/pubsub/pubsub_flags.h"
19 : #include "lib/pubsub/pub_binding_st.h"
20 : #include "lib/pubsub/pubsub_build.h"
21 : #include "lib/pubsub/pubsub_builder_st.h"
22 : #include "lib/pubsub/pubsub_connect.h"
23 :
24 : #include "lib/container/smartlist.h"
25 : #include "lib/log/util_bug.h"
26 : #include "lib/malloc/malloc.h"
27 :
28 : #include <string.h>
29 :
30 : /** Construct and return a new empty pubsub_items_t. */
31 : static pubsub_items_t *
32 262 : pubsub_items_new(void)
33 : {
34 262 : pubsub_items_t *cfg = tor_malloc_zero(sizeof(*cfg));
35 262 : cfg->items = smartlist_new();
36 262 : cfg->type_items = smartlist_new();
37 262 : return cfg;
38 : }
39 :
40 : /** Release all storage held in a pubsub_items_t. */
41 : void
42 498 : pubsub_items_free_(pubsub_items_t *cfg)
43 : {
44 498 : if (! cfg)
45 : return;
46 8169 : SMARTLIST_FOREACH(cfg->items, pubsub_cfg_t *, item, tor_free(item));
47 1517 : SMARTLIST_FOREACH(cfg->type_items,
48 : pubsub_type_cfg_t *, item, tor_free(item));
49 262 : smartlist_free(cfg->items);
50 262 : smartlist_free(cfg->type_items);
51 262 : tor_free(cfg);
52 : }
53 :
54 : /** Construct and return a new pubsub_builder_t. */
55 : pubsub_builder_t *
56 262 : pubsub_builder_new(void)
57 : {
58 262 : dispatch_naming_init();
59 :
60 262 : pubsub_builder_t *pb = tor_malloc_zero(sizeof(*pb));
61 262 : pb->cfg = dcfg_new();
62 262 : pb->items = pubsub_items_new();
63 262 : return pb;
64 : }
65 :
66 : /**
67 : * Release all storage held by a pubsub_builder_t.
68 : *
69 : * You'll (mostly) only want to call this function on an error case: if you're
70 : * constructing a dispatch_t instead, you should call
71 : * pubsub_builder_finalize() to consume the pubsub_builder_t.
72 : */
73 : void
74 274 : pubsub_builder_free_(pubsub_builder_t *pb)
75 : {
76 274 : if (pb == NULL)
77 : return;
78 262 : pubsub_items_free(pb->items);
79 262 : dcfg_free(pb->cfg);
80 262 : tor_free(pb);
81 : }
82 :
83 : /**
84 : * Create and return a pubsub_connector_t for the subsystem with ID
85 : * <b>subsys</b> to use in adding publications, subscriptions, and types to
86 : * <b>builder</b>.
87 : **/
88 : pubsub_connector_t *
89 5684 : pubsub_connector_for_subsystem(pubsub_builder_t *builder,
90 : subsys_id_t subsys)
91 : {
92 5684 : tor_assert(builder);
93 5684 : ++builder->n_connectors;
94 :
95 5684 : pubsub_connector_t *con = tor_malloc_zero(sizeof(*con));
96 :
97 5684 : con->builder = builder;
98 5684 : con->subsys_id = subsys;
99 :
100 5684 : return con;
101 : }
102 :
103 : /**
104 : * Release all storage held by a pubsub_connector_t.
105 : **/
106 : void
107 5686 : pubsub_connector_free_(pubsub_connector_t *con)
108 : {
109 5686 : if (!con)
110 : return;
111 :
112 5684 : if (con->builder) {
113 5684 : --con->builder->n_connectors;
114 5684 : tor_assert(con->builder->n_connectors >= 0);
115 : }
116 5684 : tor_free(con);
117 : }
118 :
119 : /**
120 : * Use <b>con</b> to add a request for being able to publish messages of type
121 : * <b>msg</b> with auxiliary data of <b>type</b> on <b>channel</b>.
122 : **/
123 : int
124 1277 : pubsub_add_pub_(pubsub_connector_t *con,
125 : pub_binding_t *out,
126 : channel_id_t channel,
127 : message_id_t msg,
128 : msg_type_id_t type,
129 : unsigned flags,
130 : const char *file,
131 : unsigned line)
132 : {
133 1277 : pubsub_cfg_t *cfg = tor_malloc_zero(sizeof(*cfg));
134 :
135 1277 : memset(out, 0, sizeof(*out));
136 1277 : cfg->is_publish = true;
137 :
138 1277 : out->msg_template.sender = cfg->subsys = con->subsys_id;
139 1277 : out->msg_template.channel = cfg->channel = channel;
140 1277 : out->msg_template.msg = cfg->msg = msg;
141 1277 : out->msg_template.type = cfg->type = type;
142 :
143 1277 : cfg->flags = flags;
144 1277 : cfg->added_by_file = file;
145 1277 : cfg->added_by_line = line;
146 :
147 : /* We're grabbing a pointer to the pub_binding_t so we can tell it about
148 : * the dispatcher later on.
149 : */
150 1277 : cfg->pub_binding = out;
151 :
152 1277 : smartlist_add(con->builder->items->items, cfg);
153 :
154 1277 : if (dcfg_msg_set_type(con->builder->cfg, msg, type) < 0)
155 1 : goto err;
156 1276 : if (dcfg_msg_set_chan(con->builder->cfg, msg, channel) < 0)
157 1 : goto err;
158 :
159 : return 0;
160 2 : err:
161 2 : ++con->builder->n_errors;
162 2 : return -1;
163 : }
164 :
165 : /**
166 : * Use <b>con</b> to add a request for being able to publish messages of type
167 : * <b>msg</b> with auxiliary data of <b>type</b> on <b>channel</b>,
168 : * passing them to the callback in <b>recv_fn</b>.
169 : **/
170 : int
171 6630 : pubsub_add_sub_(pubsub_connector_t *con,
172 : recv_fn_t recv_fn,
173 : channel_id_t channel,
174 : message_id_t msg,
175 : msg_type_id_t type,
176 : unsigned flags,
177 : const char *file,
178 : unsigned line)
179 : {
180 6630 : pubsub_cfg_t *cfg = tor_malloc_zero(sizeof(*cfg));
181 :
182 6630 : cfg->is_publish = false;
183 6630 : cfg->subsys = con->subsys_id;
184 6630 : cfg->channel = channel;
185 6630 : cfg->msg = msg;
186 6630 : cfg->type = type;
187 6630 : cfg->flags = flags;
188 6630 : cfg->added_by_file = file;
189 6630 : cfg->added_by_line = line;
190 :
191 6630 : cfg->recv_fn = recv_fn;
192 :
193 6630 : smartlist_add(con->builder->items->items, cfg);
194 :
195 6630 : if (dcfg_msg_set_type(con->builder->cfg, msg, type) < 0)
196 0 : goto err;
197 6630 : if (dcfg_msg_set_chan(con->builder->cfg, msg, channel) < 0)
198 0 : goto err;
199 6630 : if (! (flags & DISP_FLAG_STUB)) {
200 6624 : if (dcfg_add_recv(con->builder->cfg, msg, cfg->subsys, recv_fn) < 0)
201 0 : goto err;
202 : }
203 :
204 : return 0;
205 0 : err:
206 0 : ++con->builder->n_errors;
207 0 : return -1;
208 : }
209 :
210 : /**
211 : * Use <b>con</b> to define the functions to use for manipulating the type
212 : * <b>type</b>. Any function pointers left as NULL will be implemented as
213 : * no-ops.
214 : **/
215 : int
216 1255 : pubsub_connector_register_type_(pubsub_connector_t *con,
217 : msg_type_id_t type,
218 : dispatch_typefns_t *fns,
219 : const char *file,
220 : unsigned line)
221 : {
222 1255 : pubsub_type_cfg_t *cfg = tor_malloc_zero(sizeof(*cfg));
223 1255 : cfg->type = type;
224 1255 : memcpy(&cfg->fns, fns, sizeof(*fns));
225 1255 : cfg->subsys = con->subsys_id;
226 1255 : cfg->added_by_file = file;
227 1255 : cfg->added_by_line = line;
228 :
229 1255 : smartlist_add(con->builder->items->type_items, cfg);
230 :
231 1255 : if (dcfg_type_set_fns(con->builder->cfg, type, fns) < 0)
232 1 : goto err;
233 :
234 : return 0;
235 1 : err:
236 1 : ++con->builder->n_errors;
237 1 : return -1;
238 : }
239 :
240 : /**
241 : * Initialize the dispatch_ptr field in every relevant publish binding
242 : * for <b>d</b>.
243 : */
244 : static void
245 256 : pubsub_items_install_bindings(pubsub_items_t *items,
246 : dispatch_t *d)
247 : {
248 8136 : SMARTLIST_FOREACH_BEGIN(items->items, pubsub_cfg_t *, cfg) {
249 7880 : if (cfg->pub_binding) {
250 : // XXXX we could skip this for STUB publishers, and for any publishers
251 : // XXXX where all subscribers are STUB.
252 1263 : cfg->pub_binding->dispatch_ptr = d;
253 : }
254 7880 : } SMARTLIST_FOREACH_END(cfg);
255 256 : }
256 :
257 : /**
258 : * Remove the dispatch_ptr fields for all the relevant publish bindings
259 : * in <b>items</b>. The prevents subsequent dispatch_pub_() calls from
260 : * sending messages to a dispatcher that has been freed.
261 : **/
262 : void
263 236 : pubsub_items_clear_bindings(pubsub_items_t *items)
264 : {
265 2825 : SMARTLIST_FOREACH_BEGIN(items->items, pubsub_cfg_t *, cfg) {
266 2589 : if (cfg->pub_binding) {
267 1177 : cfg->pub_binding->dispatch_ptr = NULL;
268 : }
269 2589 : } SMARTLIST_FOREACH_END(cfg);
270 236 : }
271 :
272 : /**
273 : * Create a new dispatcher as configured in a pubsub_builder_t.
274 : *
275 : * Consumes and frees its input.
276 : **/
277 : dispatch_t *
278 262 : pubsub_builder_finalize(pubsub_builder_t *builder,
279 : pubsub_items_t **items_out)
280 : {
281 262 : dispatch_t *dispatcher = NULL;
282 262 : tor_assert_nonfatal(builder->n_connectors == 0);
283 :
284 262 : if (pubsub_builder_check(builder) < 0)
285 5 : goto err;
286 :
287 257 : if (builder->n_errors) {
288 1 : log_warn(LD_GENERAL, "At least one error occurred previously when "
289 : "configuring the dispatcher.");
290 1 : goto err;
291 : }
292 :
293 256 : dispatcher = dispatch_new(builder->cfg);
294 :
295 256 : if (!dispatcher)
296 0 : goto err;
297 :
298 256 : pubsub_items_install_bindings(builder->items, dispatcher);
299 256 : if (items_out) {
300 236 : *items_out = builder->items;
301 236 : builder->items = NULL; /* Prevent free */
302 : }
303 :
304 20 : err:
305 262 : pubsub_builder_free(builder);
306 262 : return dispatcher;
307 : }
|