LCOV - code coverage report
Current view: top level - test - test_workqueue.c (source / functions) Hit Total Coverage
Test: lcov.info Lines: 153 206 74.3 %
Date: 2021-11-24 03:28:48 Functions: 9 13 69.2 %

          Line data    Source code
       1             : /* Copyright (c) 2001-2004, Roger Dingledine.
       2             :  * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
       3             :  * Copyright (c) 2007-2021, The Tor Project, Inc. */
       4             : /* See LICENSE for licensing information */
       5             : 
       6             : #include "core/or/or.h"
       7             : #include "lib/thread/threads.h"
       8             : #include "core/or/onion.h"
       9             : #include "lib/evloop/workqueue.h"
      10             : #include "lib/crypt_ops/crypto_curve25519.h"
      11             : #include "lib/crypt_ops/crypto_rand.h"
      12             : #include "lib/net/alertsock.h"
      13             : #include "lib/evloop/compat_libevent.h"
      14             : #include "lib/intmath/weakrng.h"
      15             : #include "lib/crypt_ops/crypto_init.h"
      16             : 
      17             : #include <stdio.h>
      18             : 
      19             : #define MAX_INFLIGHT (1<<16)
      20             : 
      21             : static int opt_verbose = 0;
      22             : static int opt_n_threads = 8;
      23             : static int opt_n_items = 10000;
      24             : static int opt_n_inflight = 1000;
      25             : static int opt_n_lowwater = 250;
      26             : static int opt_n_cancel = 0;
      27             : static int opt_ratio_rsa = 5;
      28             : 
      29             : #ifdef TRACK_RESPONSES
      30             : tor_mutex_t bitmap_mutex;
      31             : int handled_len;
      32             : bitarray_t *handled;
      33             : #endif
      34             : 
      35             : typedef struct state_t {
      36             :   int magic;
      37             :   int n_handled;
      38             :   crypto_pk_t *rsa;
      39             :   curve25519_secret_key_t ecdh;
      40             :   int is_shutdown;
      41             : } state_t;
      42             : 
      43             : typedef struct rsa_work_t {
      44             :   int serial;
      45             :   uint8_t msg[128];
      46             :   uint8_t msglen;
      47             : } rsa_work_t;
      48             : 
      49             : typedef struct ecdh_work_t {
      50             :   int serial;
      51             :   union {
      52             :     curve25519_public_key_t pk;
      53             :     uint8_t msg[32];
      54             :   } u;
      55             : } ecdh_work_t;
      56             : 
      57             : static void
      58             : mark_handled(int serial)
      59             : {
      60             : #ifdef TRACK_RESPONSES
      61             :   tor_mutex_acquire(&bitmap_mutex);
      62             :   tor_assert(serial < handled_len);
      63             :   tor_assert(! bitarray_is_set(handled, serial));
      64             :   bitarray_set(handled, serial);
      65             :   tor_mutex_release(&bitmap_mutex);
      66             : #else /* !defined(TRACK_RESPONSES) */
      67             :   (void)serial;
      68             : #endif /* defined(TRACK_RESPONSES) */
      69             : }
      70             : 
      71             : static workqueue_reply_t
      72       14127 : workqueue_do_rsa(void *state, void *work)
      73             : {
      74       14127 :   rsa_work_t *rw = work;
      75       14127 :   state_t *st = state;
      76       14127 :   crypto_pk_t *rsa = st->rsa;
      77       14127 :   uint8_t sig[256];
      78       14127 :   int len;
      79             : 
      80       14127 :   tor_assert(st->magic == 13371337);
      81             : 
      82       27835 :   len = crypto_pk_private_sign(rsa, (char*)sig, 256,
      83       14127 :                                (char*)rw->msg, rw->msglen);
      84       13708 :   if (len < 0) {
      85           0 :     rw->msglen = 0;
      86           0 :     return WQ_RPL_ERROR;
      87             :   }
      88             : 
      89       13708 :   memset(rw->msg, 0, sizeof(rw->msg));
      90       13708 :   rw->msglen = len;
      91       13708 :   memcpy(rw->msg, sig, len);
      92       13708 :   ++st->n_handled;
      93             : 
      94       13708 :   mark_handled(rw->serial);
      95             : 
      96       13708 :   return WQ_RPL_REPLY;
      97             : }
      98             : 
      99             : static workqueue_reply_t
     100          54 : workqueue_do_shutdown(void *state, void *work)
     101             : {
     102          54 :   (void)state;
     103          54 :   (void)work;
     104          54 :   crypto_pk_free(((state_t*)state)->rsa);
     105          53 :   tor_free(state);
     106          53 :   return WQ_RPL_SHUTDOWN;
     107             : }
     108             : 
     109             : static workqueue_reply_t
     110       55855 : workqueue_do_ecdh(void *state, void *work)
     111             : {
     112       55855 :   ecdh_work_t *ew = work;
     113       55855 :   uint8_t output[CURVE25519_OUTPUT_LEN];
     114       55855 :   state_t *st = state;
     115             : 
     116       55855 :   tor_assert(st->magic == 13371337);
     117             : 
     118       55855 :   curve25519_handshake(output, &st->ecdh, &ew->u.pk);
     119       55223 :   memcpy(ew->u.msg, output, CURVE25519_OUTPUT_LEN);
     120       55223 :   ++st->n_handled;
     121       55223 :   mark_handled(ew->serial);
     122       55223 :   return WQ_RPL_REPLY;
     123             : }
     124             : 
     125             : static workqueue_reply_t
     126           0 : workqueue_shutdown_error(void *state, void *work)
     127             : {
     128           0 :   (void)state;
     129           0 :   (void)work;
     130           0 :   return WQ_RPL_REPLY;
     131             : }
     132             : 
     133             : static void *
     134          56 : new_state(void *arg)
     135             : {
     136          56 :   state_t *st;
     137          56 :   (void)arg;
     138             : 
     139          56 :   st = tor_malloc(sizeof(*st));
     140             :   /* Every thread gets its own keys. not a problem for benchmarking */
     141          56 :   st->rsa = crypto_pk_new();
     142          56 :   if (crypto_pk_generate_key_with_bits(st->rsa, 1024) < 0) {
     143           0 :     crypto_pk_free(st->rsa);
     144           0 :     tor_free(st);
     145           0 :     return NULL;
     146             :   }
     147          56 :   curve25519_secret_key_generate(&st->ecdh, 0);
     148          56 :   st->magic = 13371337;
     149          56 :   return st;
     150             : }
     151             : 
     152             : static void
     153           0 : free_state(void *arg)
     154             : {
     155           0 :   state_t *st = arg;
     156           0 :   crypto_pk_free(st->rsa);
     157           0 :   tor_free(st);
     158           0 : }
     159             : 
     160             : static tor_weak_rng_t weak_rng;
     161             : static int n_sent = 0;
     162             : static int rsa_sent = 0;
     163             : static int ecdh_sent = 0;
     164             : static int n_received_previously = 0;
     165             : static int n_received = 0;
     166             : static int no_shutdown = 0;
     167             : 
     168             : #ifdef TRACK_RESPONSES
     169             : bitarray_t *received;
     170             : #endif
     171             : 
     172             : static void
     173       69991 : handle_reply(void *arg)
     174             : {
     175             : #ifdef TRACK_RESPONSES
     176             :   rsa_work_t *rw = arg; /* Naughty cast, but only looking at serial. */
     177             :   tor_assert(! bitarray_is_set(received, rw->serial));
     178             :   bitarray_set(received,rw->serial);
     179             : #endif
     180             : 
     181       69991 :   tor_free(arg);
     182       69991 :   ++n_received;
     183       69991 : }
     184             : 
     185             : /* This should never get called. */
     186             : static void
     187           0 : handle_reply_shutdown(void *arg)
     188             : {
     189           0 :   (void)arg;
     190           0 :   no_shutdown = 1;
     191           0 : }
     192             : 
     193             : static workqueue_entry_t *
     194       70000 : add_work(threadpool_t *tp)
     195             : {
     196      140000 :   int add_rsa =
     197      140000 :     opt_ratio_rsa == 0 ||
     198       70000 :     tor_weak_random_range(&weak_rng, opt_ratio_rsa) == 0;
     199             : 
     200       70000 :   if (add_rsa) {
     201       14128 :     rsa_work_t *w = tor_malloc_zero(sizeof(*w));
     202       14128 :     w->serial = n_sent++;
     203       14128 :     crypto_rand((char*)w->msg, 20);
     204       14128 :     w->msglen = 20;
     205       14128 :     ++rsa_sent;
     206       14128 :     return threadpool_queue_work_priority(tp,
     207             :                                           WQ_PRI_MED,
     208             :                                           workqueue_do_rsa, handle_reply, w);
     209             :   } else {
     210       55872 :     ecdh_work_t *w = tor_malloc_zero(sizeof(*w));
     211       55872 :     w->serial = n_sent++;
     212             :     /* Not strictly right, but this is just for benchmarks. */
     213       55872 :     crypto_rand((char*)w->u.pk.public_key, 32);
     214       55872 :     ++ecdh_sent;
     215       55872 :     return threadpool_queue_work(tp, workqueue_do_ecdh, handle_reply, w);
     216             :   }
     217             : }
     218             : 
     219             : static int n_failed_cancel = 0;
     220             : static int n_successful_cancel = 0;
     221             : 
     222             : static int
     223        1045 : add_n_work_items(threadpool_t *tp, int n)
     224             : {
     225        1045 :   int n_queued = 0;
     226        1045 :   int n_try_cancel = 0, i;
     227        1045 :   workqueue_entry_t **to_cancel;
     228        1045 :   workqueue_entry_t *ent;
     229             : 
     230             :   // We'll choose randomly which entries to cancel.
     231        1045 :   to_cancel = tor_calloc(opt_n_cancel, sizeof(workqueue_entry_t*));
     232             : 
     233       64045 :   while (n_queued++ < n) {
     234       63000 :     ent = add_work(tp);
     235       63000 :     if (! ent) {
     236           0 :       puts("Z");
     237           0 :       tor_libevent_exit_loop_after_delay(tor_libevent_get_base(), NULL);
     238           0 :       return -1;
     239             :     }
     240             : 
     241       63000 :     if (n_try_cancel < opt_n_cancel) {
     242          10 :       to_cancel[n_try_cancel++] = ent;
     243             :     } else {
     244       62990 :       int p = tor_weak_random_range(&weak_rng, n_queued);
     245       62990 :       if (p < n_try_cancel) {
     246          64 :         to_cancel[p] = ent;
     247             :       }
     248             :     }
     249             :   }
     250             : 
     251        1055 :   for (i = 0; i < n_try_cancel; ++i) {
     252          10 :     void *work = workqueue_entry_cancel(to_cancel[i]);
     253          10 :     if (! work) {
     254           1 :       n_failed_cancel++;
     255             :     } else {
     256           9 :       n_successful_cancel++;
     257           9 :       tor_free(work);
     258             :     }
     259             :   }
     260             : 
     261        1045 :   tor_free(to_cancel);
     262        1045 :   return 0;
     263             : }
     264             : 
     265             : static int shutting_down = 0;
     266             : 
     267             : static void
     268       18995 : replysock_readable_cb(threadpool_t *tp)
     269             : {
     270       18995 :   if (n_received_previously == n_received)
     271             :     return;
     272             : 
     273       18896 :   n_received_previously = n_received;
     274             : 
     275       18896 :   if (opt_verbose) {
     276           0 :     printf("%d / %d", n_received, n_sent);
     277           0 :     if (opt_n_cancel)
     278           0 :       printf(" (%d cancelled, %d uncancellable)",
     279             :              n_successful_cancel, n_failed_cancel);
     280           0 :     puts("");
     281             :   }
     282             : #ifdef TRACK_RESPONSES
     283             :   tor_mutex_acquire(&bitmap_mutex);
     284             :   for (i = 0; i < opt_n_items; ++i) {
     285             :     if (bitarray_is_set(received, i))
     286             :       putc('o', stdout);
     287             :     else if (bitarray_is_set(handled, i))
     288             :       putc('!', stdout);
     289             :     else
     290             :       putc('.', stdout);
     291             :   }
     292             :   puts("");
     293             :   tor_mutex_release(&bitmap_mutex);
     294             : #endif /* defined(TRACK_RESPONSES) */
     295             : 
     296       18896 :   if (n_sent - (n_received+n_successful_cancel) < opt_n_lowwater) {
     297        1045 :     int n_to_send = n_received + opt_n_inflight - n_sent;
     298        1045 :     if (n_to_send > opt_n_items - n_sent)
     299             :       n_to_send = opt_n_items - n_sent;
     300        1045 :     add_n_work_items(tp, n_to_send);
     301             :   }
     302             : 
     303       18896 :   if (shutting_down == 0 &&
     304       18896 :       n_received+n_successful_cancel == n_sent &&
     305           7 :       n_sent >= opt_n_items) {
     306           7 :     shutting_down = 1;
     307           7 :     threadpool_queue_update(tp, NULL,
     308             :                              workqueue_do_shutdown, NULL, NULL);
     309             :     // Anything we add after starting the shutdown must not be executed.
     310           7 :     threadpool_queue_work(tp, workqueue_shutdown_error,
     311             :                           handle_reply_shutdown, NULL);
     312             :     {
     313           7 :       struct timeval limit = { 2, 0 };
     314           7 :       tor_libevent_exit_loop_after_delay(tor_libevent_get_base(), &limit);
     315             :     }
     316             :   }
     317             : }
     318             : 
     319             : static void
     320           0 : help(void)
     321             : {
     322           0 :   puts(
     323             :      "Options:\n"
     324             :      "  -h            Display this information\n"
     325             :      "  -v            Be verbose\n"
     326             :      "  -N <items>    Run this many items of work\n"
     327             :      "  -T <threads>  Use this many threads\n"
     328             :      "  -I <inflight> Have no more than this many requests queued at once\n"
     329             :      "  -L <lowwater> Add items whenever fewer than this many are pending\n"
     330             :      "  -C <cancel>   Try to cancel N items of every batch that we add\n"
     331             :      "  -R <ratio>    Make one out of this many items be a slow (RSA) one\n"
     332             :      "  --no-{eventfd2,eventfd,pipe2,pipe,socketpair}\n"
     333             :      "                Disable one of the alert_socket backends.");
     334           0 : }
     335             : 
     336             : int
     337           7 : main(int argc, char **argv)
     338             : {
     339           7 :   replyqueue_t *rq;
     340           7 :   threadpool_t *tp;
     341           7 :   int i;
     342           7 :   tor_libevent_cfg_t evcfg;
     343           7 :   uint32_t as_flags = 0;
     344             : 
     345          28 :   for (i = 1; i < argc; ++i) {
     346          21 :     if (!strcmp(argv[i], "-v")) {
     347           0 :       opt_verbose = 1;
     348          21 :     } else if (!strcmp(argv[i], "-T") && i+1<argc) {
     349           0 :       opt_n_threads = atoi(argv[++i]);
     350          21 :     } else if (!strcmp(argv[i], "-N") && i+1<argc) {
     351           0 :       opt_n_items = atoi(argv[++i]);
     352          21 :     } else if (!strcmp(argv[i], "-I") && i+1<argc) {
     353           0 :       opt_n_inflight = atoi(argv[++i]);
     354          21 :     } else if (!strcmp(argv[i], "-L") && i+1<argc) {
     355           0 :       opt_n_lowwater = atoi(argv[++i]);
     356          21 :     } else if (!strcmp(argv[i], "-R") && i+1<argc) {
     357           0 :       opt_ratio_rsa = atoi(argv[++i]);
     358          21 :     } else if (!strcmp(argv[i], "-C") && i+1<argc) {
     359           1 :       opt_n_cancel = atoi(argv[++i]);
     360          20 :     } else if (!strcmp(argv[i], "--no-eventfd2")) {
     361           4 :       as_flags |= ASOCKS_NOEVENTFD2;
     362          16 :     } else if (!strcmp(argv[i], "--no-eventfd")) {
     363           4 :       as_flags |= ASOCKS_NOEVENTFD;
     364          12 :     } else if (!strcmp(argv[i], "--no-pipe2")) {
     365           4 :       as_flags |= ASOCKS_NOPIPE2;
     366           8 :     } else if (!strcmp(argv[i], "--no-pipe")) {
     367           4 :       as_flags |= ASOCKS_NOPIPE;
     368           4 :     } else if (!strcmp(argv[i], "--no-socketpair")) {
     369           4 :       as_flags |= ASOCKS_NOSOCKETPAIR;
     370           0 :     } else if (!strcmp(argv[i], "-h")) {
     371           0 :       help();
     372           7 :       return 0;
     373             :     } else {
     374           0 :       help();
     375           0 :       return 1;
     376             :     }
     377             :   }
     378             : 
     379           7 :   if (opt_n_threads < 1 ||
     380           7 :       opt_n_items < 1 || opt_n_inflight < 1 || opt_n_lowwater < 0 ||
     381           7 :       opt_n_cancel > opt_n_inflight || opt_n_inflight > MAX_INFLIGHT ||
     382           7 :       opt_ratio_rsa < 0) {
     383           0 :     help();
     384           0 :     return 1;
     385             :   }
     386             : 
     387           7 :   if (opt_n_inflight > opt_n_items) {
     388           0 :       opt_n_inflight = opt_n_items;
     389             :   }
     390             : 
     391           7 :   init_logging(1);
     392           7 :   network_init();
     393           7 :   if (crypto_global_init(1, NULL, NULL) < 0) {
     394           0 :     printf("Couldn't initialize crypto subsystem; exiting.\n");
     395           0 :     return 1;
     396             :   }
     397           7 :   if (crypto_seed_rng() < 0) {
     398           0 :     printf("Couldn't seed RNG; exiting.\n");
     399           0 :     return 1;
     400             :   }
     401             : 
     402           7 :   rq = replyqueue_new(as_flags);
     403           7 :   if (as_flags && rq == NULL)
     404             :     return 77; // 77 means "skipped".
     405             : 
     406           7 :   tor_assert(rq);
     407           7 :   tp = threadpool_new(opt_n_threads,
     408             :                       rq, new_state, free_state, NULL);
     409           7 :   tor_assert(tp);
     410             : 
     411           7 :   crypto_seed_weak_rng(&weak_rng);
     412             : 
     413           7 :   memset(&evcfg, 0, sizeof(evcfg));
     414           7 :   tor_libevent_initialize(&evcfg);
     415             : 
     416             :   {
     417           7 :     int r = threadpool_register_reply_event(tp,
     418             :                                             replysock_readable_cb);
     419           7 :     tor_assert(r == 0);
     420             :   }
     421             : 
     422             : #ifdef TRACK_RESPONSES
     423             :   handled = bitarray_init_zero(opt_n_items);
     424             :   received = bitarray_init_zero(opt_n_items);
     425             :   tor_mutex_init(&bitmap_mutex);
     426             :   handled_len = opt_n_items;
     427             : #endif /* defined(TRACK_RESPONSES) */
     428             : 
     429        7007 :   for (i = 0; i < opt_n_inflight; ++i) {
     430        7000 :     if (! add_work(tp)) {
     431           0 :       puts("Couldn't add work.");
     432           0 :       return 1;
     433             :     }
     434             :   }
     435             : 
     436             :   {
     437           7 :     struct timeval limit = { 180, 0 };
     438           7 :     tor_libevent_exit_loop_after_delay(tor_libevent_get_base(), &limit);
     439             :   }
     440             : 
     441           7 :   tor_libevent_run_event_loop(tor_libevent_get_base(), 0);
     442             : 
     443           7 :   if (n_sent != opt_n_items || n_received+n_successful_cancel != n_sent) {
     444           0 :     printf("%d vs %d\n", n_sent, opt_n_items);
     445           0 :     printf("%d+%d vs %d\n", n_received, n_successful_cancel, n_sent);
     446           0 :     puts("FAIL");
     447           0 :     return 1;
     448           7 :   } else if (no_shutdown) {
     449           0 :     puts("Accepted work after shutdown\n");
     450           0 :     puts("FAIL");
     451             :   } else {
     452           7 :     puts("OK");
     453           7 :     return 0;
     454             :   }
     455             : }

Generated by: LCOV version 1.14