Line data Source code
1 : /* Copyright (c) 2001-2004, Roger Dingledine.
2 : * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
3 : * Copyright (c) 2007-2021, The Tor Project, Inc. */
4 : /* See LICENSE for licensing information */
5 :
6 : #include "core/or/or.h"
7 : #include "lib/thread/threads.h"
8 : #include "core/or/onion.h"
9 : #include "lib/evloop/workqueue.h"
10 : #include "lib/crypt_ops/crypto_curve25519.h"
11 : #include "lib/crypt_ops/crypto_rand.h"
12 : #include "lib/net/alertsock.h"
13 : #include "lib/evloop/compat_libevent.h"
14 : #include "lib/intmath/weakrng.h"
15 : #include "lib/crypt_ops/crypto_init.h"
16 :
17 : #include <stdio.h>
18 :
19 : #define MAX_INFLIGHT (1<<16)
20 :
21 : static int opt_verbose = 0;
22 : static int opt_n_threads = 8;
23 : static int opt_n_items = 10000;
24 : static int opt_n_inflight = 1000;
25 : static int opt_n_lowwater = 250;
26 : static int opt_n_cancel = 0;
27 : static int opt_ratio_rsa = 5;
28 :
29 : #ifdef TRACK_RESPONSES
30 : tor_mutex_t bitmap_mutex;
31 : int handled_len;
32 : bitarray_t *handled;
33 : #endif
34 :
35 : typedef struct state_t {
36 : int magic;
37 : int n_handled;
38 : crypto_pk_t *rsa;
39 : curve25519_secret_key_t ecdh;
40 : int is_shutdown;
41 : } state_t;
42 :
43 : typedef struct rsa_work_t {
44 : int serial;
45 : uint8_t msg[128];
46 : uint8_t msglen;
47 : } rsa_work_t;
48 :
49 : typedef struct ecdh_work_t {
50 : int serial;
51 : union {
52 : curve25519_public_key_t pk;
53 : uint8_t msg[32];
54 : } u;
55 : } ecdh_work_t;
56 :
57 : static void
58 : mark_handled(int serial)
59 : {
60 : #ifdef TRACK_RESPONSES
61 : tor_mutex_acquire(&bitmap_mutex);
62 : tor_assert(serial < handled_len);
63 : tor_assert(! bitarray_is_set(handled, serial));
64 : bitarray_set(handled, serial);
65 : tor_mutex_release(&bitmap_mutex);
66 : #else /* !defined(TRACK_RESPONSES) */
67 : (void)serial;
68 : #endif /* defined(TRACK_RESPONSES) */
69 : }
70 :
71 : static workqueue_reply_t
72 14127 : workqueue_do_rsa(void *state, void *work)
73 : {
74 14127 : rsa_work_t *rw = work;
75 14127 : state_t *st = state;
76 14127 : crypto_pk_t *rsa = st->rsa;
77 14127 : uint8_t sig[256];
78 14127 : int len;
79 :
80 14127 : tor_assert(st->magic == 13371337);
81 :
82 27835 : len = crypto_pk_private_sign(rsa, (char*)sig, 256,
83 14127 : (char*)rw->msg, rw->msglen);
84 13708 : if (len < 0) {
85 0 : rw->msglen = 0;
86 0 : return WQ_RPL_ERROR;
87 : }
88 :
89 13708 : memset(rw->msg, 0, sizeof(rw->msg));
90 13708 : rw->msglen = len;
91 13708 : memcpy(rw->msg, sig, len);
92 13708 : ++st->n_handled;
93 :
94 13708 : mark_handled(rw->serial);
95 :
96 13708 : return WQ_RPL_REPLY;
97 : }
98 :
99 : static workqueue_reply_t
100 54 : workqueue_do_shutdown(void *state, void *work)
101 : {
102 54 : (void)state;
103 54 : (void)work;
104 54 : crypto_pk_free(((state_t*)state)->rsa);
105 53 : tor_free(state);
106 53 : return WQ_RPL_SHUTDOWN;
107 : }
108 :
109 : static workqueue_reply_t
110 55855 : workqueue_do_ecdh(void *state, void *work)
111 : {
112 55855 : ecdh_work_t *ew = work;
113 55855 : uint8_t output[CURVE25519_OUTPUT_LEN];
114 55855 : state_t *st = state;
115 :
116 55855 : tor_assert(st->magic == 13371337);
117 :
118 55855 : curve25519_handshake(output, &st->ecdh, &ew->u.pk);
119 55223 : memcpy(ew->u.msg, output, CURVE25519_OUTPUT_LEN);
120 55223 : ++st->n_handled;
121 55223 : mark_handled(ew->serial);
122 55223 : return WQ_RPL_REPLY;
123 : }
124 :
125 : static workqueue_reply_t
126 0 : workqueue_shutdown_error(void *state, void *work)
127 : {
128 0 : (void)state;
129 0 : (void)work;
130 0 : return WQ_RPL_REPLY;
131 : }
132 :
133 : static void *
134 56 : new_state(void *arg)
135 : {
136 56 : state_t *st;
137 56 : (void)arg;
138 :
139 56 : st = tor_malloc(sizeof(*st));
140 : /* Every thread gets its own keys. not a problem for benchmarking */
141 56 : st->rsa = crypto_pk_new();
142 56 : if (crypto_pk_generate_key_with_bits(st->rsa, 1024) < 0) {
143 0 : crypto_pk_free(st->rsa);
144 0 : tor_free(st);
145 0 : return NULL;
146 : }
147 56 : curve25519_secret_key_generate(&st->ecdh, 0);
148 56 : st->magic = 13371337;
149 56 : return st;
150 : }
151 :
152 : static void
153 0 : free_state(void *arg)
154 : {
155 0 : state_t *st = arg;
156 0 : crypto_pk_free(st->rsa);
157 0 : tor_free(st);
158 0 : }
159 :
160 : static tor_weak_rng_t weak_rng;
161 : static int n_sent = 0;
162 : static int rsa_sent = 0;
163 : static int ecdh_sent = 0;
164 : static int n_received_previously = 0;
165 : static int n_received = 0;
166 : static int no_shutdown = 0;
167 :
168 : #ifdef TRACK_RESPONSES
169 : bitarray_t *received;
170 : #endif
171 :
172 : static void
173 69991 : handle_reply(void *arg)
174 : {
175 : #ifdef TRACK_RESPONSES
176 : rsa_work_t *rw = arg; /* Naughty cast, but only looking at serial. */
177 : tor_assert(! bitarray_is_set(received, rw->serial));
178 : bitarray_set(received,rw->serial);
179 : #endif
180 :
181 69991 : tor_free(arg);
182 69991 : ++n_received;
183 69991 : }
184 :
185 : /* This should never get called. */
186 : static void
187 0 : handle_reply_shutdown(void *arg)
188 : {
189 0 : (void)arg;
190 0 : no_shutdown = 1;
191 0 : }
192 :
193 : static workqueue_entry_t *
194 70000 : add_work(threadpool_t *tp)
195 : {
196 140000 : int add_rsa =
197 140000 : opt_ratio_rsa == 0 ||
198 70000 : tor_weak_random_range(&weak_rng, opt_ratio_rsa) == 0;
199 :
200 70000 : if (add_rsa) {
201 14128 : rsa_work_t *w = tor_malloc_zero(sizeof(*w));
202 14128 : w->serial = n_sent++;
203 14128 : crypto_rand((char*)w->msg, 20);
204 14128 : w->msglen = 20;
205 14128 : ++rsa_sent;
206 14128 : return threadpool_queue_work_priority(tp,
207 : WQ_PRI_MED,
208 : workqueue_do_rsa, handle_reply, w);
209 : } else {
210 55872 : ecdh_work_t *w = tor_malloc_zero(sizeof(*w));
211 55872 : w->serial = n_sent++;
212 : /* Not strictly right, but this is just for benchmarks. */
213 55872 : crypto_rand((char*)w->u.pk.public_key, 32);
214 55872 : ++ecdh_sent;
215 55872 : return threadpool_queue_work(tp, workqueue_do_ecdh, handle_reply, w);
216 : }
217 : }
218 :
219 : static int n_failed_cancel = 0;
220 : static int n_successful_cancel = 0;
221 :
222 : static int
223 1045 : add_n_work_items(threadpool_t *tp, int n)
224 : {
225 1045 : int n_queued = 0;
226 1045 : int n_try_cancel = 0, i;
227 1045 : workqueue_entry_t **to_cancel;
228 1045 : workqueue_entry_t *ent;
229 :
230 : // We'll choose randomly which entries to cancel.
231 1045 : to_cancel = tor_calloc(opt_n_cancel, sizeof(workqueue_entry_t*));
232 :
233 64045 : while (n_queued++ < n) {
234 63000 : ent = add_work(tp);
235 63000 : if (! ent) {
236 0 : puts("Z");
237 0 : tor_libevent_exit_loop_after_delay(tor_libevent_get_base(), NULL);
238 0 : return -1;
239 : }
240 :
241 63000 : if (n_try_cancel < opt_n_cancel) {
242 10 : to_cancel[n_try_cancel++] = ent;
243 : } else {
244 62990 : int p = tor_weak_random_range(&weak_rng, n_queued);
245 62990 : if (p < n_try_cancel) {
246 64 : to_cancel[p] = ent;
247 : }
248 : }
249 : }
250 :
251 1055 : for (i = 0; i < n_try_cancel; ++i) {
252 10 : void *work = workqueue_entry_cancel(to_cancel[i]);
253 10 : if (! work) {
254 1 : n_failed_cancel++;
255 : } else {
256 9 : n_successful_cancel++;
257 9 : tor_free(work);
258 : }
259 : }
260 :
261 1045 : tor_free(to_cancel);
262 1045 : return 0;
263 : }
264 :
265 : static int shutting_down = 0;
266 :
267 : static void
268 18995 : replysock_readable_cb(threadpool_t *tp)
269 : {
270 18995 : if (n_received_previously == n_received)
271 : return;
272 :
273 18896 : n_received_previously = n_received;
274 :
275 18896 : if (opt_verbose) {
276 0 : printf("%d / %d", n_received, n_sent);
277 0 : if (opt_n_cancel)
278 0 : printf(" (%d cancelled, %d uncancellable)",
279 : n_successful_cancel, n_failed_cancel);
280 0 : puts("");
281 : }
282 : #ifdef TRACK_RESPONSES
283 : tor_mutex_acquire(&bitmap_mutex);
284 : for (i = 0; i < opt_n_items; ++i) {
285 : if (bitarray_is_set(received, i))
286 : putc('o', stdout);
287 : else if (bitarray_is_set(handled, i))
288 : putc('!', stdout);
289 : else
290 : putc('.', stdout);
291 : }
292 : puts("");
293 : tor_mutex_release(&bitmap_mutex);
294 : #endif /* defined(TRACK_RESPONSES) */
295 :
296 18896 : if (n_sent - (n_received+n_successful_cancel) < opt_n_lowwater) {
297 1045 : int n_to_send = n_received + opt_n_inflight - n_sent;
298 1045 : if (n_to_send > opt_n_items - n_sent)
299 : n_to_send = opt_n_items - n_sent;
300 1045 : add_n_work_items(tp, n_to_send);
301 : }
302 :
303 18896 : if (shutting_down == 0 &&
304 18896 : n_received+n_successful_cancel == n_sent &&
305 7 : n_sent >= opt_n_items) {
306 7 : shutting_down = 1;
307 7 : threadpool_queue_update(tp, NULL,
308 : workqueue_do_shutdown, NULL, NULL);
309 : // Anything we add after starting the shutdown must not be executed.
310 7 : threadpool_queue_work(tp, workqueue_shutdown_error,
311 : handle_reply_shutdown, NULL);
312 : {
313 7 : struct timeval limit = { 2, 0 };
314 7 : tor_libevent_exit_loop_after_delay(tor_libevent_get_base(), &limit);
315 : }
316 : }
317 : }
318 :
319 : static void
320 0 : help(void)
321 : {
322 0 : puts(
323 : "Options:\n"
324 : " -h Display this information\n"
325 : " -v Be verbose\n"
326 : " -N <items> Run this many items of work\n"
327 : " -T <threads> Use this many threads\n"
328 : " -I <inflight> Have no more than this many requests queued at once\n"
329 : " -L <lowwater> Add items whenever fewer than this many are pending\n"
330 : " -C <cancel> Try to cancel N items of every batch that we add\n"
331 : " -R <ratio> Make one out of this many items be a slow (RSA) one\n"
332 : " --no-{eventfd2,eventfd,pipe2,pipe,socketpair}\n"
333 : " Disable one of the alert_socket backends.");
334 0 : }
335 :
336 : int
337 7 : main(int argc, char **argv)
338 : {
339 7 : replyqueue_t *rq;
340 7 : threadpool_t *tp;
341 7 : int i;
342 7 : tor_libevent_cfg_t evcfg;
343 7 : uint32_t as_flags = 0;
344 :
345 28 : for (i = 1; i < argc; ++i) {
346 21 : if (!strcmp(argv[i], "-v")) {
347 0 : opt_verbose = 1;
348 21 : } else if (!strcmp(argv[i], "-T") && i+1<argc) {
349 0 : opt_n_threads = atoi(argv[++i]);
350 21 : } else if (!strcmp(argv[i], "-N") && i+1<argc) {
351 0 : opt_n_items = atoi(argv[++i]);
352 21 : } else if (!strcmp(argv[i], "-I") && i+1<argc) {
353 0 : opt_n_inflight = atoi(argv[++i]);
354 21 : } else if (!strcmp(argv[i], "-L") && i+1<argc) {
355 0 : opt_n_lowwater = atoi(argv[++i]);
356 21 : } else if (!strcmp(argv[i], "-R") && i+1<argc) {
357 0 : opt_ratio_rsa = atoi(argv[++i]);
358 21 : } else if (!strcmp(argv[i], "-C") && i+1<argc) {
359 1 : opt_n_cancel = atoi(argv[++i]);
360 20 : } else if (!strcmp(argv[i], "--no-eventfd2")) {
361 4 : as_flags |= ASOCKS_NOEVENTFD2;
362 16 : } else if (!strcmp(argv[i], "--no-eventfd")) {
363 4 : as_flags |= ASOCKS_NOEVENTFD;
364 12 : } else if (!strcmp(argv[i], "--no-pipe2")) {
365 4 : as_flags |= ASOCKS_NOPIPE2;
366 8 : } else if (!strcmp(argv[i], "--no-pipe")) {
367 4 : as_flags |= ASOCKS_NOPIPE;
368 4 : } else if (!strcmp(argv[i], "--no-socketpair")) {
369 4 : as_flags |= ASOCKS_NOSOCKETPAIR;
370 0 : } else if (!strcmp(argv[i], "-h")) {
371 0 : help();
372 7 : return 0;
373 : } else {
374 0 : help();
375 0 : return 1;
376 : }
377 : }
378 :
379 7 : if (opt_n_threads < 1 ||
380 7 : opt_n_items < 1 || opt_n_inflight < 1 || opt_n_lowwater < 0 ||
381 7 : opt_n_cancel > opt_n_inflight || opt_n_inflight > MAX_INFLIGHT ||
382 7 : opt_ratio_rsa < 0) {
383 0 : help();
384 0 : return 1;
385 : }
386 :
387 7 : if (opt_n_inflight > opt_n_items) {
388 0 : opt_n_inflight = opt_n_items;
389 : }
390 :
391 7 : init_logging(1);
392 7 : network_init();
393 7 : if (crypto_global_init(1, NULL, NULL) < 0) {
394 0 : printf("Couldn't initialize crypto subsystem; exiting.\n");
395 0 : return 1;
396 : }
397 7 : if (crypto_seed_rng() < 0) {
398 0 : printf("Couldn't seed RNG; exiting.\n");
399 0 : return 1;
400 : }
401 :
402 7 : rq = replyqueue_new(as_flags);
403 7 : if (as_flags && rq == NULL)
404 : return 77; // 77 means "skipped".
405 :
406 7 : tor_assert(rq);
407 7 : tp = threadpool_new(opt_n_threads,
408 : rq, new_state, free_state, NULL);
409 7 : tor_assert(tp);
410 :
411 7 : crypto_seed_weak_rng(&weak_rng);
412 :
413 7 : memset(&evcfg, 0, sizeof(evcfg));
414 7 : tor_libevent_initialize(&evcfg);
415 :
416 : {
417 7 : int r = threadpool_register_reply_event(tp,
418 : replysock_readable_cb);
419 7 : tor_assert(r == 0);
420 : }
421 :
422 : #ifdef TRACK_RESPONSES
423 : handled = bitarray_init_zero(opt_n_items);
424 : received = bitarray_init_zero(opt_n_items);
425 : tor_mutex_init(&bitmap_mutex);
426 : handled_len = opt_n_items;
427 : #endif /* defined(TRACK_RESPONSES) */
428 :
429 7007 : for (i = 0; i < opt_n_inflight; ++i) {
430 7000 : if (! add_work(tp)) {
431 0 : puts("Couldn't add work.");
432 0 : return 1;
433 : }
434 : }
435 :
436 : {
437 7 : struct timeval limit = { 180, 0 };
438 7 : tor_libevent_exit_loop_after_delay(tor_libevent_get_base(), &limit);
439 : }
440 :
441 7 : tor_libevent_run_event_loop(tor_libevent_get_base(), 0);
442 :
443 7 : if (n_sent != opt_n_items || n_received+n_successful_cancel != n_sent) {
444 0 : printf("%d vs %d\n", n_sent, opt_n_items);
445 0 : printf("%d+%d vs %d\n", n_received, n_successful_cancel, n_sent);
446 0 : puts("FAIL");
447 0 : return 1;
448 7 : } else if (no_shutdown) {
449 0 : puts("Accepted work after shutdown\n");
450 0 : puts("FAIL");
451 : } else {
452 7 : puts("OK");
453 7 : return 0;
454 : }
455 : }
|