Tor  0.4.7.0-alpha-dev
dispatch_core.c
Go to the documentation of this file.
1 /* Copyright (c) 2001, Matej Pfajfar.
2  * Copyright (c) 2001-2004, Roger Dingledine.
3  * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
4  * Copyright (c) 2007-2021, The Tor Project, Inc. */
5 /* See LICENSE for licensing information */
6 
7 /**
8  * \file dispatch_core.c
9  * \brief Core module for sending and receiving messages.
10  */
11 
12 #define DISPATCH_PRIVATE
13 #include "orconfig.h"
14 
15 #include "lib/dispatch/dispatch.h"
18 
19 #include "lib/malloc/malloc.h"
20 #include "lib/log/util_bug.h"
21 
22 #include <string.h>
23 
24 /**
25  * Use <b>d</b> to drop all storage held for <b>msg</b>.
26  *
27  * (We need the dispatcher so we know how to free the auxiliary data.)
28  **/
29 void
31 {
32  if (!msg)
33  return;
34 
35  d->typefns[msg->type].free_fn(msg->aux_data__);
36  tor_free(msg);
37 }
38 
39 /**
40  * Format the auxiliary data held by msg.
41  **/
42 char *
43 dispatch_fmt_msg_data(const dispatch_t *d, const msg_t *msg)
44 {
45  if (!msg)
46  return NULL;
47 
48  return d->typefns[msg->type].fmt_fn(msg->aux_data__);
49 }
50 
51 /**
52  * Release all storage held by <b>d</b>.
53  **/
54 void
56 {
57  if (d == NULL)
58  return;
59 
60  size_t n_queues = d->n_queues;
61  for (size_t i = 0; i < n_queues; ++i) {
62  msg_t *m, *mtmp;
63  TOR_SIMPLEQ_FOREACH_SAFE(m, &d->queues[i].queue, next, mtmp) {
64  dispatch_free_msg(d, m);
65  }
66  }
67 
68  size_t n_msgs = d->n_msgs;
69 
70  for (size_t i = 0; i < n_msgs; ++i) {
71  tor_free(d->table[i]);
72  }
73  tor_free(d->table);
74  tor_free(d->typefns);
75  tor_free(d->queues);
76 
77  // This is the only time we will treat d->cfg as non-const.
78  //dispatch_cfg_free_((dispatch_items_t *) d->cfg);
79 
80  tor_free(d);
81 }
82 
83 /**
84  * Tell the dispatcher to call <b>fn</b> with <b>userdata</b> whenever
85  * <b>chan</b> becomes nonempty. Return 0 on success, -1 on error.
86  **/
87 int
88 dispatch_set_alert_fn(dispatch_t *d, channel_id_t chan,
89  dispatch_alertfn_t fn, void *userdata)
90 {
91  if (BUG(chan >= d->n_queues))
92  return -1;
93 
94  dqueue_t *q = &d->queues[chan];
95  q->alert_fn = fn;
96  q->alert_fn_arg = userdata;
97  return 0;
98 }
99 
100 /**
101  * Send a message on the appropriate channel notifying that channel if
102  * necessary.
103  *
104  * This function takes ownership of the auxiliary data; it can't be static or
105  * stack-allocated, and the caller is not allowed to use it afterwards.
106  *
107  * This function does not check the various vields of the message object for
108  * consistency.
109  **/
110 int
112  subsys_id_t sender,
113  channel_id_t channel,
114  message_id_t msg,
115  msg_type_id_t type,
116  msg_aux_data_t auxdata)
117 {
118  if (!d->table[msg]) {
119  /* Fast path: nobody wants this data. */
120 
121  d->typefns[type].free_fn(auxdata);
122  return 0;
123  }
124 
125  msg_t *m = tor_malloc(sizeof(msg_t));
126 
127  m->sender = sender;
128  m->channel = channel;
129  m->msg = msg;
130  m->type = type;
131  memcpy(&m->aux_data__, &auxdata, sizeof(msg_aux_data_t));
132 
133  return dispatch_send_msg(d, m);
134 }
135 
136 int
137 dispatch_send_msg(dispatch_t *d, msg_t *m)
138 {
139  if (BUG(!d))
140  goto err;
141  if (BUG(!m))
142  goto err;
143  if (BUG(m->channel >= d->n_queues))
144  goto err;
145  if (BUG(m->msg >= d->n_msgs))
146  goto err;
147 
148  dtbl_entry_t *ent = d->table[m->msg];
149  if (ent) {
150  if (BUG(m->type != ent->type))
151  goto err;
152  if (BUG(m->channel != ent->channel))
153  goto err;
154  }
155 
156  return dispatch_send_msg_unchecked(d, m);
157  err:
158  /* Probably it isn't safe to free m, since type could be wrong. */
159  return -1;
160 }
161 
162 /**
163  * Send a message on the appropriate queue, notifying that queue if necessary.
164  *
165  * This function takes ownership of the message object and its auxiliary data;
166  * it can't be static or stack-allocated, and the caller isn't allowed to use
167  * it afterwards.
168  *
169  * This function does not check the various fields of the message object for
170  * consistency, and can crash if they are out of range. Only functions that
171  * have already constructed the message in a safe way, or checked it for
172  * correctness themselves, should call this function.
173  **/
174 int
176 {
177  /* Find the right queue. */
178  dqueue_t *q = &d->queues[m->channel];
179  bool was_empty = TOR_SIMPLEQ_EMPTY(&q->queue);
180 
181  /* Append the message. */
182  TOR_SIMPLEQ_INSERT_TAIL(&q->queue, m, next);
183 
184  if (debug_logging_enabled()) {
185  char *arg = dispatch_fmt_msg_data(d, m);
186  log_debug(LD_MESG,
187  "Queued: %s (%s) from %s, on %s.",
188  get_message_id_name(m->msg),
189  arg,
190  get_subsys_id_name(m->sender),
191  get_channel_id_name(m->channel));
192  tor_free(arg);
193  }
194 
195  /* If we just made the queue nonempty for the first time, call the alert
196  * function. */
197  if (was_empty) {
198  q->alert_fn(d, m->channel, q->alert_fn_arg);
199  }
200 
201  return 0;
202 }
203 
204 /**
205  * Run all of the callbacks on <b>d</b> associated with <b>m</b>.
206  **/
207 static void
209 {
210  tor_assert(m->msg <= d->n_msgs);
211  dtbl_entry_t *ent = d->table[m->msg];
212  int n_fns = ent->n_fns;
213 
214  if (debug_logging_enabled()) {
215  char *arg = dispatch_fmt_msg_data(d, m);
216  log_debug(LD_MESG,
217  "Delivering: %s (%s) from %s, on %s:",
218  get_message_id_name(m->msg),
219  arg,
220  get_subsys_id_name(m->sender),
221  get_channel_id_name(m->channel));
222  tor_free(arg);
223  }
224 
225  int i;
226  for (i=0; i < n_fns; ++i) {
227  if (ent->rcv[i].enabled) {
228  log_debug(LD_MESG, " Delivering to %s.",
229  get_subsys_id_name(ent->rcv[i].sys));
230  ent->rcv[i].fn(m);
231  }
232  }
233 }
234 
235 /**
236  * Run up to <b>max_msgs</b> callbacks for messages on the channel <b>ch</b>
237  * on the given dispatcher. Return 0 on success or recoverable failure,
238  * -1 on unrecoverable error.
239  **/
240 int
241 dispatch_flush(dispatch_t *d, channel_id_t ch, int max_msgs)
242 {
243  if (BUG(ch >= d->n_queues))
244  return 0;
245 
246  int n_flushed = 0;
247  dqueue_t *q = &d->queues[ch];
248 
249  while (n_flushed < max_msgs) {
250  msg_t *m = TOR_SIMPLEQ_FIRST(&q->queue);
251  if (!m)
252  break;
253  TOR_SIMPLEQ_REMOVE_HEAD(&q->queue, next);
255  dispatch_free_msg(d, m);
256  ++n_flushed;
257  }
258 
259  return 0;
260 }
Low-level APIs for message-passing system.
void(* dispatch_alertfn_t)(struct dispatch_t *, channel_id_t, void *)
Definition: dispatch.h:98
struct dispatch_t dispatch_t
Definition: dispatch.h:53
int dispatch_send(dispatch_t *d, subsys_id_t sender, channel_id_t channel, message_id_t msg, msg_type_id_t type, msg_aux_data_t auxdata)
void dispatch_free_msg_(const dispatch_t *d, msg_t *msg)
Definition: dispatch_core.c:30
char * dispatch_fmt_msg_data(const dispatch_t *d, const msg_t *msg)
Definition: dispatch_core.c:43
int dispatch_flush(dispatch_t *d, channel_id_t ch, int max_msgs)
int dispatch_set_alert_fn(dispatch_t *d, channel_id_t chan, dispatch_alertfn_t fn, void *userdata)
Definition: dispatch_core.c:88
static void dispatcher_run_msg_cbs(const dispatch_t *d, msg_t *m)
int dispatch_send_msg_unchecked(dispatch_t *d, msg_t *m)
void dispatch_free_(dispatch_t *d)
Definition: dispatch_core.c:55
Header for dispatch_naming.c.
const char * get_channel_id_name(channel_id_t)
private structures used for the dispatcher module
static bool debug_logging_enabled(void)
Definition: log.h:217
#define LD_MESG
Definition: log.h:121
Headers for util_malloc.c.
#define tor_free(p)
Definition: malloc.h:52
uint16_t subsys_id_t
Definition: msgtypes.h:22
uint16_t msg_type_id_t
Definition: msgtypes.h:29
Definition: msgtypes.h:50
msg_type_id_t type
Definition: msgtypes.h:57
msg_aux_data_t aux_data__
Definition: msgtypes.h:60
Macros to manage assertions, fatal and non-fatal.
#define tor_assert(expr)
Definition: util_bug.h:102