Line data Source code
1 : /* Copyright (c) 2018-2021, The Tor Project, Inc. */
2 : /* See LICENSE for licensing information */
3 :
4 : #define DISPATCH_PRIVATE
5 :
6 : #include "test/test.h"
7 :
8 : #include "lib/dispatch/dispatch.h"
9 : #include "lib/dispatch/dispatch_naming.h"
10 : #include "lib/dispatch/dispatch_st.h"
11 : #include "lib/dispatch/msgtypes.h"
12 : #include "lib/pubsub/pubsub_flags.h"
13 : #include "lib/pubsub/pub_binding_st.h"
14 : #include "lib/pubsub/pubsub_build.h"
15 : #include "lib/pubsub/pubsub_builder_st.h"
16 : #include "lib/pubsub/pubsub_connect.h"
17 : #include "lib/pubsub/pubsub_publish.h"
18 :
19 : #include "lib/log/escape.h"
20 : #include "lib/malloc/malloc.h"
21 : #include "lib/string/printf.h"
22 :
23 : #include <stdio.h>
24 : #include <string.h>
25 :
26 : static char *
27 120 : ex_str_fmt(msg_aux_data_t aux)
28 : {
29 120 : return esc_for_log(aux.ptr);
30 : }
31 : static void
32 107 : ex_str_free(msg_aux_data_t aux)
33 : {
34 107 : tor_free_(aux.ptr);
35 107 : }
36 : static dispatch_typefns_t stringfns = {
37 : .free_fn = ex_str_free,
38 : .fmt_fn = ex_str_fmt
39 : };
40 :
41 : // We're using the lowest-level publish/subscribe logic here, to avoid the
42 : // pubsub_macros.h macros and just test the dispatch core. We'll use a string
43 : // type for everything.
44 :
45 : #define DECLARE_MESSAGE(suffix) \
46 : static pub_binding_t pub_binding_##suffix; \
47 : static int msg_received_##suffix = 0; \
48 : static void recv_msg_##suffix(const msg_t *m) { \
49 : (void)m; \
50 : ++msg_received_##suffix; \
51 : } \
52 : EAT_SEMICOLON
53 :
54 : #define ADD_PUBLISH(binding_suffix, subsys, channel, msg, flags) \
55 : STMT_BEGIN { \
56 : con = pubsub_connector_for_subsystem(builder, \
57 : get_subsys_id(#subsys)); \
58 : pubsub_add_pub_(con, &pub_binding_##binding_suffix, \
59 : get_channel_id(#channel), \
60 : get_message_id(#msg), get_msg_type_id("string"), \
61 : (flags), __FILE__, __LINE__); \
62 : pubsub_connector_free(con); \
63 : } STMT_END
64 :
65 : #define ADD_SUBSCRIBE(hook_suffix, subsys, channel, msg, flags) \
66 : STMT_BEGIN { \
67 : con = pubsub_connector_for_subsystem(builder, \
68 : get_subsys_id(#subsys)); \
69 : pubsub_add_sub_(con, recv_msg_##hook_suffix, \
70 : get_channel_id(#channel), \
71 : get_message_id(#msg), get_msg_type_id("string"), \
72 : (flags), __FILE__, __LINE__); \
73 : pubsub_connector_free(con); \
74 : } STMT_END
75 :
76 : #define SEND(binding_suffix, val) \
77 : STMT_BEGIN { \
78 : msg_aux_data_t data_; \
79 : data_.ptr = tor_strdup(val); \
80 : pubsub_pub_(&pub_binding_##binding_suffix, data_); \
81 : } STMT_END
82 :
83 11 : DECLARE_MESSAGE(msg1);
84 0 : DECLARE_MESSAGE(msg2);
85 6 : DECLARE_MESSAGE(msg3);
86 0 : DECLARE_MESSAGE(msg4);
87 5 : DECLARE_MESSAGE(msg5);
88 :
89 : static smartlist_t *strings_received = NULL;
90 : static void
91 995 : recv_msg_copy_string(const msg_t *m)
92 : {
93 995 : const char *s = m->aux_data__.ptr;
94 995 : smartlist_add(strings_received, tor_strdup(s));
95 995 : }
96 :
97 : static void *
98 5 : setup_dispatcher(const struct testcase_t *testcase)
99 : {
100 5 : (void)testcase;
101 5 : pubsub_builder_t *builder = pubsub_builder_new();
102 5 : pubsub_connector_t *con;
103 :
104 : {
105 5 : con = pubsub_connector_for_subsystem(builder, get_subsys_id("types"));
106 5 : pubsub_connector_register_type_(con,
107 5 : get_msg_type_id("string"),
108 : &stringfns,
109 : "nowhere.c", 99);
110 5 : pubsub_connector_free(con);
111 : }
112 : // message1 has one publisher and one subscriber.
113 5 : ADD_PUBLISH(msg1, sys1, main, message1, 0);
114 5 : ADD_SUBSCRIBE(msg1, sys2, main, message1, 0);
115 :
116 : // message2 has a publisher and a stub subscriber.
117 5 : ADD_PUBLISH(msg2, sys1, main, message2, 0);
118 5 : ADD_SUBSCRIBE(msg2, sys2, main, message2, DISP_FLAG_STUB);
119 :
120 : // message3 has a publisher and three subscribers.
121 5 : ADD_PUBLISH(msg3, sys1, main, message3, 0);
122 5 : ADD_SUBSCRIBE(msg3, sys2, main, message3, 0);
123 5 : ADD_SUBSCRIBE(msg3, sys3, main, message3, 0);
124 5 : ADD_SUBSCRIBE(msg3, sys4, main, message3, 0);
125 :
126 : // message4 has one publisher and two subscribers, but it's on another
127 : // channel.
128 5 : ADD_PUBLISH(msg4, sys2, other, message4, 0);
129 5 : ADD_SUBSCRIBE(msg4, sys1, other, message4, 0);
130 5 : ADD_SUBSCRIBE(msg4, sys3, other, message4, 0);
131 :
132 : // message5 has a huge number of recipients.
133 5 : ADD_PUBLISH(msg5, sys3, main, message5, 0);
134 5 : ADD_SUBSCRIBE(msg5, sys4, main, message5, 0);
135 5 : ADD_SUBSCRIBE(msg5, sys5, main, message5, 0);
136 5 : ADD_SUBSCRIBE(msg5, sys6, main, message5, 0);
137 5 : ADD_SUBSCRIBE(msg5, sys7, main, message5, 0);
138 5 : ADD_SUBSCRIBE(msg5, sys8, main, message5, 0);
139 4980 : for (int i = 0; i < 1000-5; ++i) {
140 4975 : char *sys;
141 4975 : tor_asprintf(&sys, "xsys-%d", i);
142 4975 : con = pubsub_connector_for_subsystem(builder, get_subsys_id(sys));
143 4975 : pubsub_add_sub_(con, recv_msg_copy_string,
144 4975 : get_channel_id("main"),
145 4975 : get_message_id("message5"),
146 4975 : get_msg_type_id("string"), 0, "here", 100);
147 4975 : pubsub_connector_free(con);
148 4975 : tor_free(sys);
149 : }
150 :
151 5 : return pubsub_builder_finalize(builder, NULL);
152 : }
153 :
154 : static int
155 5 : cleanup_dispatcher(const struct testcase_t *testcase, void *dispatcher_)
156 : {
157 5 : (void)testcase;
158 5 : dispatch_t *dispatcher = dispatcher_;
159 5 : dispatch_free(dispatcher);
160 5 : return 1;
161 : }
162 :
163 : static const struct testcase_setup_t dispatcher_setup = {
164 : setup_dispatcher, cleanup_dispatcher
165 : };
166 :
167 : static void
168 1 : test_pubsub_msg_minimal(void *arg)
169 : {
170 1 : dispatch_t *d = arg;
171 :
172 1 : tt_int_op(0, OP_EQ, msg_received_msg1);
173 1 : SEND(msg1, "hello world");
174 1 : tt_int_op(0, OP_EQ, msg_received_msg1); // hasn't actually arrived yet.
175 :
176 1 : tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 1000));
177 1 : tt_int_op(1, OP_EQ, msg_received_msg1); // we got the message!
178 :
179 1 : done:
180 1 : ;
181 1 : }
182 :
183 : static void
184 1 : test_pubsub_msg_send_to_stub(void *arg)
185 : {
186 1 : dispatch_t *d = arg;
187 :
188 1 : tt_int_op(0, OP_EQ, msg_received_msg2);
189 1 : SEND(msg2, "hello silence");
190 1 : tt_int_op(0, OP_EQ, msg_received_msg2); // hasn't actually arrived yet.
191 :
192 1 : tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 1000));
193 1 : tt_int_op(0, OP_EQ, msg_received_msg2); // doesn't arrive -- stub hook.
194 :
195 1 : done:
196 1 : ;
197 1 : }
198 :
199 : static void
200 1 : test_pubsub_msg_cancel_msgs(void *arg)
201 : {
202 1 : dispatch_t *d = arg;
203 :
204 1 : tt_int_op(0, OP_EQ, msg_received_msg1);
205 101 : for (int i = 0; i < 100; ++i) {
206 100 : SEND(msg1, "hello world");
207 : }
208 1 : tt_int_op(0, OP_EQ, msg_received_msg1); // hasn't actually arrived yet.
209 :
210 1 : tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 10));
211 1 : tt_int_op(10, OP_EQ, msg_received_msg1); // we got the message 10 times.
212 :
213 : // At this point, the dispatcher will be freed with queued, undelivered
214 : // messages.
215 1 : done:
216 1 : ;
217 1 : }
218 :
219 : struct alertfn_target {
220 : dispatch_t *d;
221 : channel_id_t ch;
222 : int count;
223 : };
224 : static void
225 3 : alertfn_generic(dispatch_t *d, channel_id_t ch, void *arg)
226 : {
227 3 : struct alertfn_target *t = arg;
228 3 : tt_ptr_op(d, OP_EQ, t->d);
229 3 : tt_int_op(ch, OP_EQ, t->ch);
230 3 : ++t->count;
231 3 : done:
232 3 : ;
233 3 : }
234 :
235 : static void
236 1 : test_pubsub_msg_alertfns(void *arg)
237 : {
238 1 : dispatch_t *d = arg;
239 1 : struct alertfn_target ch1_a = { d, get_channel_id("main"), 0 };
240 1 : struct alertfn_target ch2_a = { d, get_channel_id("other"), 0 };
241 :
242 1 : tt_int_op(0, OP_EQ,
243 : dispatch_set_alert_fn(d, get_channel_id("main"),
244 : alertfn_generic, &ch1_a));
245 1 : tt_int_op(0, OP_EQ,
246 : dispatch_set_alert_fn(d, get_channel_id("other"),
247 : alertfn_generic, &ch2_a));
248 :
249 1 : SEND(msg3, "hello");
250 1 : tt_int_op(ch1_a.count, OP_EQ, 1);
251 1 : SEND(msg3, "world");
252 1 : tt_int_op(ch1_a.count, OP_EQ, 1); // only the first message sends an alert
253 1 : tt_int_op(ch2_a.count, OP_EQ, 0); // no alert for 'other'
254 :
255 1 : SEND(msg4, "worse things happen in C");
256 1 : tt_int_op(ch2_a.count, OP_EQ, 1);
257 :
258 : // flush the first (main) channel...
259 1 : tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 1000));
260 1 : tt_int_op(6, OP_EQ, msg_received_msg3); // 3 subscribers, 2 instances.
261 :
262 : // now that the main channel is flushed, sending another message on it
263 : // starts another alert.
264 1 : tt_int_op(ch1_a.count, OP_EQ, 1);
265 1 : SEND(msg1, "plover");
266 1 : tt_int_op(ch1_a.count, OP_EQ, 2);
267 1 : tt_int_op(ch2_a.count, OP_EQ, 1);
268 :
269 1 : done:
270 1 : ;
271 1 : }
272 :
273 : /* try more than N_FAST_FNS hooks on msg5 */
274 : static void
275 1 : test_pubsub_msg_many_hooks(void *arg)
276 : {
277 1 : dispatch_t *d = arg;
278 1 : strings_received = smartlist_new();
279 :
280 1 : tt_int_op(0, OP_EQ, msg_received_msg5);
281 1 : SEND(msg5, "hello world");
282 1 : tt_int_op(0, OP_EQ, msg_received_msg5);
283 1 : tt_int_op(0, OP_EQ, smartlist_len(strings_received));
284 :
285 1 : tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 100000));
286 1 : tt_int_op(5, OP_EQ, msg_received_msg5);
287 1 : tt_int_op(995, OP_EQ, smartlist_len(strings_received));
288 :
289 1 : done:
290 996 : SMARTLIST_FOREACH(strings_received, char *, s, tor_free(s));
291 1 : smartlist_free(strings_received);
292 1 : }
293 :
294 : #define T(name) \
295 : { #name, test_pubsub_msg_ ## name , TT_FORK, \
296 : &dispatcher_setup, NULL }
297 :
298 : struct testcase_t pubsub_msg_tests[] = {
299 : T(minimal),
300 : T(send_to_stub),
301 : T(cancel_msgs),
302 : T(alertfns),
303 : T(many_hooks),
304 : END_OF_TESTCASES
305 : };
|