Tor  0.4.7.0-alpha-dev
pubsub_build.c
Go to the documentation of this file.
1 /* Copyright (c) 2001, Matej Pfajfar.
2  * Copyright (c) 2001-2004, Roger Dingledine.
3  * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
4  * Copyright (c) 2007-2021, The Tor Project, Inc. */
5 /* See LICENSE for licensing information */
6 
7 /**
8  * @file pubsub_build.c
9  * @brief Construct a dispatch_t in safer, more OO way.
10  **/
11 
12 #define PUBSUB_PRIVATE
13 
14 #include "lib/dispatch/dispatch.h"
17 #include "lib/dispatch/msgtypes.h"
23 
25 #include "lib/log/util_bug.h"
26 #include "lib/malloc/malloc.h"
27 
28  #include <string.h>
29 
30 /** Construct and return a new empty pubsub_items_t. */
31 static pubsub_items_t *
33 {
34  pubsub_items_t *cfg = tor_malloc_zero(sizeof(*cfg));
35  cfg->items = smartlist_new();
36  cfg->type_items = smartlist_new();
37  return cfg;
38 }
39 
40 /** Release all storage held in a pubsub_items_t. */
41 void
43 {
44  if (! cfg)
45  return;
46  SMARTLIST_FOREACH(cfg->items, pubsub_cfg_t *, item, tor_free(item));
47  SMARTLIST_FOREACH(cfg->type_items,
48  pubsub_type_cfg_t *, item, tor_free(item));
49  smartlist_free(cfg->items);
50  smartlist_free(cfg->type_items);
51  tor_free(cfg);
52 }
53 
54 /** Construct and return a new pubsub_builder_t. */
57 {
58  dispatch_naming_init();
59 
60  pubsub_builder_t *pb = tor_malloc_zero(sizeof(*pb));
61  pb->cfg = dcfg_new();
62  pb->items = pubsub_items_new();
63  return pb;
64 }
65 
66 /**
67  * Release all storage held by a pubsub_builder_t.
68  *
69  * You'll (mostly) only want to call this function on an error case: if you're
70  * constructing a dispatch_t instead, you should call
71  * pubsub_builder_finalize() to consume the pubsub_builder_t.
72  */
73 void
75 {
76  if (pb == NULL)
77  return;
78  pubsub_items_free(pb->items);
79  dcfg_free(pb->cfg);
80  tor_free(pb);
81 }
82 
83 /**
84  * Create and return a pubsub_connector_t for the subsystem with ID
85  * <b>subsys</b> to use in adding publications, subscriptions, and types to
86  * <b>builder</b>.
87  **/
90  subsys_id_t subsys)
91 {
92  tor_assert(builder);
93  ++builder->n_connectors;
94 
95  pubsub_connector_t *con = tor_malloc_zero(sizeof(*con));
96 
97  con->builder = builder;
98  con->subsys_id = subsys;
99 
100  return con;
101 }
102 
103 /**
104  * Release all storage held by a pubsub_connector_t.
105  **/
106 void
108 {
109  if (!con)
110  return;
111 
112  if (con->builder) {
113  --con->builder->n_connectors;
114  tor_assert(con->builder->n_connectors >= 0);
115  }
116  tor_free(con);
117 }
118 
119 /**
120  * Use <b>con</b> to add a request for being able to publish messages of type
121  * <b>msg</b> with auxiliary data of <b>type</b> on <b>channel</b>.
122  **/
123 int
125  pub_binding_t *out,
126  channel_id_t channel,
127  message_id_t msg,
128  msg_type_id_t type,
129  unsigned flags,
130  const char *file,
131  unsigned line)
132 {
133  pubsub_cfg_t *cfg = tor_malloc_zero(sizeof(*cfg));
134 
135  memset(out, 0, sizeof(*out));
136  cfg->is_publish = true;
137 
138  out->msg_template.sender = cfg->subsys = con->subsys_id;
139  out->msg_template.channel = cfg->channel = channel;
140  out->msg_template.msg = cfg->msg = msg;
141  out->msg_template.type = cfg->type = type;
142 
143  cfg->flags = flags;
144  cfg->added_by_file = file;
145  cfg->added_by_line = line;
146 
147  /* We're grabbing a pointer to the pub_binding_t so we can tell it about
148  * the dispatcher later on.
149  */
150  cfg->pub_binding = out;
151 
152  smartlist_add(con->builder->items->items, cfg);
153 
154  if (dcfg_msg_set_type(con->builder->cfg, msg, type) < 0)
155  goto err;
156  if (dcfg_msg_set_chan(con->builder->cfg, msg, channel) < 0)
157  goto err;
158 
159  return 0;
160  err:
161  ++con->builder->n_errors;
162  return -1;
163 }
164 
165 /**
166  * Use <b>con</b> to add a request for being able to publish messages of type
167  * <b>msg</b> with auxiliary data of <b>type</b> on <b>channel</b>,
168  * passing them to the callback in <b>recv_fn</b>.
169  **/
170 int
172  recv_fn_t recv_fn,
173  channel_id_t channel,
174  message_id_t msg,
175  msg_type_id_t type,
176  unsigned flags,
177  const char *file,
178  unsigned line)
179 {
180  pubsub_cfg_t *cfg = tor_malloc_zero(sizeof(*cfg));
181 
182  cfg->is_publish = false;
183  cfg->subsys = con->subsys_id;
184  cfg->channel = channel;
185  cfg->msg = msg;
186  cfg->type = type;
187  cfg->flags = flags;
188  cfg->added_by_file = file;
189  cfg->added_by_line = line;
190 
191  cfg->recv_fn = recv_fn;
192 
193  smartlist_add(con->builder->items->items, cfg);
194 
195  if (dcfg_msg_set_type(con->builder->cfg, msg, type) < 0)
196  goto err;
197  if (dcfg_msg_set_chan(con->builder->cfg, msg, channel) < 0)
198  goto err;
199  if (! (flags & DISP_FLAG_STUB)) {
200  if (dcfg_add_recv(con->builder->cfg, msg, cfg->subsys, recv_fn) < 0)
201  goto err;
202  }
203 
204  return 0;
205  err:
206  ++con->builder->n_errors;
207  return -1;
208 }
209 
210 /**
211  * Use <b>con</b> to define the functions to use for manipulating the type
212  * <b>type</b>. Any function pointers left as NULL will be implemented as
213  * no-ops.
214  **/
215 int
217  msg_type_id_t type,
218  dispatch_typefns_t *fns,
219  const char *file,
220  unsigned line)
221 {
222  pubsub_type_cfg_t *cfg = tor_malloc_zero(sizeof(*cfg));
223  cfg->type = type;
224  memcpy(&cfg->fns, fns, sizeof(*fns));
225  cfg->subsys = con->subsys_id;
226  cfg->added_by_file = file;
227  cfg->added_by_line = line;
228 
229  smartlist_add(con->builder->items->type_items, cfg);
230 
231  if (dcfg_type_set_fns(con->builder->cfg, type, fns) < 0)
232  goto err;
233 
234  return 0;
235  err:
236  ++con->builder->n_errors;
237  return -1;
238 }
239 
240 /**
241  * Initialize the dispatch_ptr field in every relevant publish binding
242  * for <b>d</b>.
243  */
244 static void
246  dispatch_t *d)
247 {
248  SMARTLIST_FOREACH_BEGIN(items->items, pubsub_cfg_t *, cfg) {
249  if (cfg->pub_binding) {
250  // XXXX we could skip this for STUB publishers, and for any publishers
251  // XXXX where all subscribers are STUB.
252  cfg->pub_binding->dispatch_ptr = d;
253  }
254  } SMARTLIST_FOREACH_END(cfg);
255 }
256 
257 /**
258  * Remove the dispatch_ptr fields for all the relevant publish bindings
259  * in <b>items</b>. The prevents subsequent dispatch_pub_() calls from
260  * sending messages to a dispatcher that has been freed.
261  **/
262 void
264 {
265  SMARTLIST_FOREACH_BEGIN(items->items, pubsub_cfg_t *, cfg) {
266  if (cfg->pub_binding) {
267  cfg->pub_binding->dispatch_ptr = NULL;
268  }
269  } SMARTLIST_FOREACH_END(cfg);
270 }
271 
272 /**
273  * Create a new dispatcher as configured in a pubsub_builder_t.
274  *
275  * Consumes and frees its input.
276  **/
277 dispatch_t *
279  pubsub_items_t **items_out)
280 {
281  dispatch_t *dispatcher = NULL;
282  tor_assert_nonfatal(builder->n_connectors == 0);
283 
284  if (pubsub_builder_check(builder) < 0)
285  goto err;
286 
287  if (builder->n_errors) {
288  log_warn(LD_GENERAL, "At least one error occurred previously when "
289  "configuring the dispatcher.");
290  goto err;
291  }
292 
293  dispatcher = dispatch_new(builder->cfg);
294 
295  if (!dispatcher)
296  goto err;
297 
298  pubsub_items_install_bindings(builder->items, dispatcher);
299  if (items_out) {
300  *items_out = builder->items;
301  builder->items = NULL; /* Prevent free */
302  }
303 
304  err:
305  pubsub_builder_free(builder);
306  return dispatcher;
307 }
Low-level APIs for message-passing system.
struct dispatch_t dispatch_t
Definition: dispatch.h:53
int dcfg_msg_set_chan(dispatch_cfg_t *cfg, message_id_t msg, channel_id_t chan)
Definition: dispatch_cfg.c:63
int dcfg_type_set_fns(dispatch_cfg_t *cfg, msg_type_id_t type, const dispatch_typefns_t *fns)
Definition: dispatch_cfg.c:81
dispatch_cfg_t * dcfg_new(void)
Definition: dispatch_cfg.c:30
int dcfg_msg_set_type(dispatch_cfg_t *cfg, message_id_t msg, msg_type_id_t type)
Definition: dispatch_cfg.c:45
int dcfg_add_recv(dispatch_cfg_t *cfg, message_id_t msg, subsys_id_t sys, recv_fn_t fn)
Definition: dispatch_cfg.c:101
Header for distpach_cfg.c.
#define dcfg_free(cfg)
Definition: dispatch_cfg.h:40
Header for dispatch_naming.c.
dispatch_t * dispatch_new(const dispatch_cfg_t *cfg)
Definition: dispatch_new.c:113
#define LD_GENERAL
Definition: log.h:62
Headers for util_malloc.c.
#define tor_free(p)
Definition: malloc.h:52
Types used for messages in the dispatcher code.
uint16_t subsys_id_t
Definition: msgtypes.h:22
uint16_t msg_type_id_t
Definition: msgtypes.h:29
void(* recv_fn_t)(const msg_t *m)
Definition: msgtypes.h:66
Declaration of pub_binding_t.
pubsub_connector_t * pubsub_connector_for_subsystem(pubsub_builder_t *builder, subsys_id_t subsys)
Definition: pubsub_build.c:89
dispatch_t * pubsub_builder_finalize(pubsub_builder_t *builder, pubsub_items_t **items_out)
Definition: pubsub_build.c:278
int pubsub_add_pub_(pubsub_connector_t *con, pub_binding_t *out, channel_id_t channel, message_id_t msg, msg_type_id_t type, unsigned flags, const char *file, unsigned line)
Definition: pubsub_build.c:124
void pubsub_connector_free_(pubsub_connector_t *con)
Definition: pubsub_build.c:107
void pubsub_items_free_(pubsub_items_t *cfg)
Definition: pubsub_build.c:42
void pubsub_builder_free_(pubsub_builder_t *pb)
Definition: pubsub_build.c:74
static pubsub_items_t * pubsub_items_new(void)
Definition: pubsub_build.c:32
int pubsub_connector_register_type_(pubsub_connector_t *con, msg_type_id_t type, dispatch_typefns_t *fns, const char *file, unsigned line)
Definition: pubsub_build.c:216
pubsub_builder_t * pubsub_builder_new(void)
Definition: pubsub_build.c:56
void pubsub_items_clear_bindings(pubsub_items_t *items)
Definition: pubsub_build.c:263
static void pubsub_items_install_bindings(pubsub_items_t *items, dispatch_t *d)
Definition: pubsub_build.c:245
int pubsub_add_sub_(pubsub_connector_t *con, recv_fn_t recv_fn, channel_id_t channel, message_id_t msg, msg_type_id_t type, unsigned flags, const char *file, unsigned line)
Definition: pubsub_build.c:171
Header used for constructing the OO publish-subscribe facility.
#define pubsub_builder_free(db)
Definition: pubsub_build.h:50
int pubsub_builder_check(pubsub_builder_t *)
Definition: pubsub_check.c:399
struct pubsub_items_t pubsub_items_t
Definition: pubsub_build.h:35
struct pubsub_builder_t pubsub_builder_t
Definition: pubsub_build.h:28
#define pubsub_items_free(cfg)
Definition: pubsub_build.h:93
private structures used for configuring dispatchers and messages.
Header for functions that add relationships to a pubsub builder.
struct pubsub_connector_t pubsub_connector_t
Flags that can be set on publish/subscribe messages.
#define DISP_FLAG_STUB
Definition: pubsub_flags.h:30
Header for smartlist.c.
smartlist_t * smartlist_new(void)
void smartlist_add(smartlist_t *sl, void *element)
#define SMARTLIST_FOREACH_BEGIN(sl, type, var)
#define SMARTLIST_FOREACH(sl, type, var, cmd)
msg_type_id_t type
Definition: msgtypes.h:57
Macros to manage assertions, fatal and non-fatal.
#define tor_assert(expr)
Definition: util_bug.h:102