Line data Source code
1 : /* Copyright (c) 2001, Matej Pfajfar.
2 : * Copyright (c) 2001-2004, Roger Dingledine.
3 : * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
4 : * Copyright (c) 2007-2021, The Tor Project, Inc. */
5 : /* See LICENSE for licensing information */
6 :
7 : /**
8 : * \file dispatch_core.c
9 : * \brief Core module for sending and receiving messages.
10 : */
11 :
12 : #define DISPATCH_PRIVATE
13 : #include "orconfig.h"
14 :
15 : #include "lib/dispatch/dispatch.h"
16 : #include "lib/dispatch/dispatch_st.h"
17 : #include "lib/dispatch/dispatch_naming.h"
18 :
19 : #include "lib/malloc/malloc.h"
20 : #include "lib/log/util_bug.h"
21 :
22 : #include <string.h>
23 :
24 : /**
25 : * Use <b>d</b> to drop all storage held for <b>msg</b>.
26 : *
27 : * (We need the dispatcher so we know how to free the auxiliary data.)
28 : **/
29 : void
30 150 : dispatch_free_msg_(const dispatch_t *d, msg_t *msg)
31 : {
32 150 : if (!msg)
33 : return;
34 :
35 150 : d->typefns[msg->type].free_fn(msg->aux_data__);
36 150 : tor_free(msg);
37 : }
38 :
39 : /**
40 : * Format the auxiliary data held by msg.
41 : **/
42 : char *
43 211 : dispatch_fmt_msg_data(const dispatch_t *d, const msg_t *msg)
44 : {
45 211 : if (!msg)
46 : return NULL;
47 :
48 211 : return d->typefns[msg->type].fmt_fn(msg->aux_data__);
49 : }
50 :
51 : /**
52 : * Release all storage held by <b>d</b>.
53 : **/
54 : void
55 501 : dispatch_free_(dispatch_t *d)
56 : {
57 501 : if (d == NULL)
58 : return;
59 :
60 260 : size_t n_queues = d->n_queues;
61 781 : for (size_t i = 0; i < n_queues; ++i) {
62 521 : msg_t *m, *mtmp;
63 613 : TOR_SIMPLEQ_FOREACH_SAFE(m, &d->queues[i].queue, next, mtmp) {
64 92 : dispatch_free_msg(d, m);
65 : }
66 : }
67 :
68 260 : size_t n_msgs = d->n_msgs;
69 :
70 1791 : for (size_t i = 0; i < n_msgs; ++i) {
71 1531 : tor_free(d->table[i]);
72 : }
73 260 : tor_free(d->table);
74 260 : tor_free(d->typefns);
75 260 : tor_free(d->queues);
76 :
77 : // This is the only time we will treat d->cfg as non-const.
78 : //dispatch_cfg_free_((dispatch_items_t *) d->cfg);
79 :
80 260 : tor_free(d);
81 : }
82 :
83 : /**
84 : * Tell the dispatcher to call <b>fn</b> with <b>userdata</b> whenever
85 : * <b>chan</b> becomes nonempty. Return 0 on success, -1 on error.
86 : **/
87 : int
88 23 : dispatch_set_alert_fn(dispatch_t *d, channel_id_t chan,
89 : dispatch_alertfn_t fn, void *userdata)
90 : {
91 23 : if (BUG(chan >= d->n_queues))
92 0 : return -1;
93 :
94 23 : dqueue_t *q = &d->queues[chan];
95 23 : q->alert_fn = fn;
96 23 : q->alert_fn_arg = userdata;
97 23 : return 0;
98 : }
99 :
100 : /**
101 : * Send a message on the appropriate channel notifying that channel if
102 : * necessary.
103 : *
104 : * This function takes ownership of the auxiliary data; it can't be static or
105 : * stack-allocated, and the caller is not allowed to use it afterwards.
106 : *
107 : * This function does not check the various vields of the message object for
108 : * consistency.
109 : **/
110 : int
111 4 : dispatch_send(dispatch_t *d,
112 : subsys_id_t sender,
113 : channel_id_t channel,
114 : message_id_t msg,
115 : msg_type_id_t type,
116 : msg_aux_data_t auxdata)
117 : {
118 4 : if (!d->table[msg]) {
119 : /* Fast path: nobody wants this data. */
120 :
121 1 : d->typefns[type].free_fn(auxdata);
122 1 : return 0;
123 : }
124 :
125 3 : msg_t *m = tor_malloc(sizeof(msg_t));
126 :
127 3 : m->sender = sender;
128 3 : m->channel = channel;
129 3 : m->msg = msg;
130 3 : m->type = type;
131 3 : memcpy(&m->aux_data__, &auxdata, sizeof(msg_aux_data_t));
132 :
133 3 : return dispatch_send_msg(d, m);
134 : }
135 :
136 : int
137 3 : dispatch_send_msg(dispatch_t *d, msg_t *m)
138 : {
139 3 : if (BUG(!d))
140 0 : goto err;
141 3 : if (BUG(!m))
142 0 : goto err;
143 3 : if (BUG(m->channel >= d->n_queues))
144 0 : goto err;
145 3 : if (BUG(m->msg >= d->n_msgs))
146 0 : goto err;
147 :
148 3 : dtbl_entry_t *ent = d->table[m->msg];
149 3 : if (ent) {
150 3 : if (BUG(m->type != ent->type))
151 0 : goto err;
152 3 : if (BUG(m->channel != ent->channel))
153 0 : goto err;
154 : }
155 :
156 3 : return dispatch_send_msg_unchecked(d, m);
157 : err:
158 : /* Probably it isn't safe to free m, since type could be wrong. */
159 : return -1;
160 : }
161 :
162 : /**
163 : * Send a message on the appropriate queue, notifying that queue if necessary.
164 : *
165 : * This function takes ownership of the message object and its auxiliary data;
166 : * it can't be static or stack-allocated, and the caller isn't allowed to use
167 : * it afterwards.
168 : *
169 : * This function does not check the various fields of the message object for
170 : * consistency, and can crash if they are out of range. Only functions that
171 : * have already constructed the message in a safe way, or checked it for
172 : * correctness themselves, should call this function.
173 : **/
174 : int
175 150 : dispatch_send_msg_unchecked(dispatch_t *d, msg_t *m)
176 : {
177 : /* Find the right queue. */
178 150 : dqueue_t *q = &d->queues[m->channel];
179 150 : bool was_empty = TOR_SIMPLEQ_EMPTY(&q->queue);
180 :
181 : /* Append the message. */
182 150 : TOR_SIMPLEQ_INSERT_TAIL(&q->queue, m, next);
183 :
184 150 : if (debug_logging_enabled()) {
185 150 : char *arg = dispatch_fmt_msg_data(d, m);
186 150 : log_debug(LD_MESG,
187 : "Queued: %s (%s) from %s, on %s.",
188 : get_message_id_name(m->msg),
189 : arg,
190 : get_subsys_id_name(m->sender),
191 : get_channel_id_name(m->channel));
192 150 : tor_free(arg);
193 : }
194 :
195 : /* If we just made the queue nonempty for the first time, call the alert
196 : * function. */
197 150 : if (was_empty) {
198 50 : q->alert_fn(d, m->channel, q->alert_fn_arg);
199 : }
200 :
201 150 : return 0;
202 : }
203 :
204 : /**
205 : * Run all of the callbacks on <b>d</b> associated with <b>m</b>.
206 : **/
207 : static void
208 58 : dispatcher_run_msg_cbs(const dispatch_t *d, msg_t *m)
209 : {
210 58 : tor_assert(m->msg <= d->n_msgs);
211 58 : dtbl_entry_t *ent = d->table[m->msg];
212 58 : int n_fns = ent->n_fns;
213 :
214 58 : if (debug_logging_enabled()) {
215 58 : char *arg = dispatch_fmt_msg_data(d, m);
216 58 : log_debug(LD_MESG,
217 : "Delivering: %s (%s) from %s, on %s:",
218 : get_message_id_name(m->msg),
219 : arg,
220 : get_subsys_id_name(m->sender),
221 : get_channel_id_name(m->channel));
222 58 : tor_free(arg);
223 : }
224 :
225 58 : int i;
226 1128 : for (i=0; i < n_fns; ++i) {
227 1070 : if (ent->rcv[i].enabled) {
228 1070 : log_debug(LD_MESG, " Delivering to %s.",
229 : get_subsys_id_name(ent->rcv[i].sys));
230 1070 : ent->rcv[i].fn(m);
231 : }
232 : }
233 58 : }
234 :
235 : /**
236 : * Run up to <b>max_msgs</b> callbacks for messages on the channel <b>ch</b>
237 : * on the given dispatcher. Return 0 on success or recoverable failure,
238 : * -1 on unrecoverable error.
239 : **/
240 : int
241 50 : dispatch_flush(dispatch_t *d, channel_id_t ch, int max_msgs)
242 : {
243 50 : if (BUG(ch >= d->n_queues))
244 0 : return 0;
245 :
246 50 : int n_flushed = 0;
247 50 : dqueue_t *q = &d->queues[ch];
248 :
249 108 : while (n_flushed < max_msgs) {
250 107 : msg_t *m = TOR_SIMPLEQ_FIRST(&q->queue);
251 107 : if (!m)
252 : break;
253 58 : TOR_SIMPLEQ_REMOVE_HEAD(&q->queue, next);
254 58 : dispatcher_run_msg_cbs(d, m);
255 58 : dispatch_free_msg(d, m);
256 58 : ++n_flushed;
257 : }
258 :
259 : return 0;
260 : }
|