LCOV - code coverage report
Current view: top level - core/mainloop - cpuworker.c (source / functions) Hit Total Coverage
Test: lcov.info Lines: 0 225 0.0 %
Date: 2021-11-24 03:28:48 Functions: 0 16 0.0 %

          Line data    Source code
       1             : /* Copyright (c) 2003-2004, Roger Dingledine.
       2             :  * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
       3             :  * Copyright (c) 2007-2021, The Tor Project, Inc. */
       4             : /* See LICENSE for licensing information */
       5             : 
       6             : /**
       7             :  * \file cpuworker.c
       8             :  * \brief Uses the workqueue/threadpool code to farm CPU-intensive activities
       9             :  * out to subprocesses.
      10             :  *
      11             :  * The multithreading backend for this module is in workqueue.c; this module
      12             :  * specializes workqueue.c.
      13             :  *
      14             :  * Right now, we use this infrastructure
      15             :  *  <ul><li>for processing onionskins in onion.c
      16             :  *      <li>for compressing consensuses in consdiffmgr.c,
      17             :  *      <li>and for calculating diffs and compressing them in consdiffmgr.c.
      18             :  *  </ul>
      19             :  **/
      20             : #include "core/or/or.h"
      21             : #include "core/or/channel.h"
      22             : #include "core/or/circuitlist.h"
      23             : #include "core/or/connection_or.h"
      24             : #include "app/config/config.h"
      25             : #include "core/mainloop/cpuworker.h"
      26             : #include "lib/crypt_ops/crypto_rand.h"
      27             : #include "lib/crypt_ops/crypto_util.h"
      28             : #include "core/or/onion.h"
      29             : #include "feature/relay/circuitbuild_relay.h"
      30             : #include "feature/relay/onion_queue.h"
      31             : #include "feature/stats/rephist.h"
      32             : #include "feature/relay/router.h"
      33             : #include "lib/evloop/workqueue.h"
      34             : #include "core/crypto/onion_crypto.h"
      35             : 
      36             : #include "core/or/or_circuit_st.h"
      37             : 
      38             : static void queue_pending_tasks(void);
      39             : 
      40             : typedef struct worker_state_t {
      41             :   int generation;
      42             :   server_onion_keys_t *onion_keys;
      43             : } worker_state_t;
      44             : 
      45             : static void *
      46           0 : worker_state_new(void *arg)
      47             : {
      48           0 :   worker_state_t *ws;
      49           0 :   (void)arg;
      50           0 :   ws = tor_malloc_zero(sizeof(worker_state_t));
      51           0 :   ws->onion_keys = server_onion_keys_new();
      52           0 :   return ws;
      53             : }
      54             : 
      55             : #define worker_state_free(ws) \
      56             :   FREE_AND_NULL(worker_state_t, worker_state_free_, (ws))
      57             : 
      58             : static void
      59           0 : worker_state_free_(worker_state_t *ws)
      60             : {
      61           0 :   if (!ws)
      62             :     return;
      63           0 :   server_onion_keys_free(ws->onion_keys);
      64           0 :   tor_free(ws);
      65             : }
      66             : 
      67             : static void
      68           0 : worker_state_free_void(void *arg)
      69             : {
      70           0 :   worker_state_free_(arg);
      71           0 : }
      72             : 
      73             : static replyqueue_t *replyqueue = NULL;
      74             : static threadpool_t *threadpool = NULL;
      75             : 
      76             : static int total_pending_tasks = 0;
      77             : static int max_pending_tasks = 128;
      78             : 
      79             : /** Initialize the cpuworker subsystem. It is OK to call this more than once
      80             :  * during Tor's lifetime.
      81             :  */
      82             : void
      83           0 : cpu_init(void)
      84             : {
      85           0 :   if (!replyqueue) {
      86           0 :     replyqueue = replyqueue_new(0);
      87             :   }
      88           0 :   if (!threadpool) {
      89             :     /*
      90             :       In our threadpool implementation, half the threads are permissive and
      91             :       half are strict (when it comes to running lower-priority tasks). So we
      92             :       always make sure we have at least two threads, so that there will be at
      93             :       least one thread of each kind.
      94             :     */
      95           0 :     const int n_threads = get_num_cpus(get_options()) + 1;
      96           0 :     threadpool = threadpool_new(n_threads,
      97             :                                 replyqueue,
      98             :                                 worker_state_new,
      99             :                                 worker_state_free_void,
     100             :                                 NULL);
     101             : 
     102           0 :     int r = threadpool_register_reply_event(threadpool, NULL);
     103             : 
     104           0 :     tor_assert(r == 0);
     105             :   }
     106             : 
     107             :   /* Total voodoo. Can we make this more sensible? */
     108           0 :   max_pending_tasks = get_num_cpus(get_options()) * 64;
     109           0 : }
     110             : 
     111             : /** Magic numbers to make sure our cpuworker_requests don't grow any
     112             :  * mis-framing bugs. */
     113             : #define CPUWORKER_REQUEST_MAGIC 0xda4afeed
     114             : #define CPUWORKER_REPLY_MAGIC 0x5eedf00d
     115             : 
     116             : /** A request sent to a cpuworker. */
     117             : typedef struct cpuworker_request_t {
     118             :   /** Magic number; must be CPUWORKER_REQUEST_MAGIC. */
     119             :   uint32_t magic;
     120             : 
     121             :   /** Flag: Are we timing this request? */
     122             :   unsigned timed : 1;
     123             :   /** If we're timing this request, when was it sent to the cpuworker? */
     124             :   struct timeval started_at;
     125             : 
     126             :   /** A create cell for the cpuworker to process. */
     127             :   create_cell_t create_cell;
     128             : 
     129             :   /* Turn the above into a tagged union if needed. */
     130             : } cpuworker_request_t;
     131             : 
     132             : /** A reply sent by a cpuworker. */
     133             : typedef struct cpuworker_reply_t {
     134             :   /** Magic number; must be CPUWORKER_REPLY_MAGIC. */
     135             :   uint32_t magic;
     136             : 
     137             :   /** True iff we got a successful request. */
     138             :   uint8_t success;
     139             : 
     140             :   /** Are we timing this request? */
     141             :   unsigned int timed : 1;
     142             :   /** What handshake type was the request? (Used for timing) */
     143             :   uint16_t handshake_type;
     144             :   /** When did we send the request to the cpuworker? */
     145             :   struct timeval started_at;
     146             :   /** Once the cpuworker received the request, how many microseconds did it
     147             :    * take? (This shouldn't overflow; 4 billion micoseconds is over an hour,
     148             :    * and we'll never have an onion handshake that takes so long.) */
     149             :   uint32_t n_usec;
     150             : 
     151             :   /** Output of processing a create cell
     152             :    *
     153             :    * @{
     154             :    */
     155             :   /** The created cell to send back. */
     156             :   created_cell_t created_cell;
     157             :   /** The keys to use on this circuit. */
     158             :   uint8_t keys[CPATH_KEY_MATERIAL_LEN];
     159             :   /** Input to use for authenticating introduce1 cells. */
     160             :   uint8_t rend_auth_material[DIGEST_LEN];
     161             : } cpuworker_reply_t;
     162             : 
     163             : typedef struct cpuworker_job_u_t {
     164             :   or_circuit_t *circ;
     165             :   union {
     166             :     cpuworker_request_t request;
     167             :     cpuworker_reply_t reply;
     168             :   } u;
     169             : } cpuworker_job_t;
     170             : 
     171             : static workqueue_reply_t
     172           0 : update_state_threadfn(void *state_, void *work_)
     173             : {
     174           0 :   worker_state_t *state = state_;
     175           0 :   worker_state_t *update = work_;
     176           0 :   server_onion_keys_free(state->onion_keys);
     177           0 :   state->onion_keys = update->onion_keys;
     178           0 :   update->onion_keys = NULL;
     179           0 :   worker_state_free(update);
     180           0 :   ++state->generation;
     181           0 :   return WQ_RPL_REPLY;
     182             : }
     183             : 
     184             : /** Called when the onion key has changed so update all CPU worker(s) with
     185             :  * new function pointers with which a new state will be generated.
     186             :  */
     187             : void
     188           0 : cpuworkers_rotate_keyinfo(void)
     189             : {
     190           0 :   if (!threadpool) {
     191             :     /* If we're a client, then we won't have cpuworkers, and we won't need
     192             :      * to tell them to rotate their state.
     193             :      */
     194             :     return;
     195             :   }
     196           0 :   if (threadpool_queue_update(threadpool,
     197             :                               worker_state_new,
     198             :                               update_state_threadfn,
     199             :                               worker_state_free_void,
     200             :                               NULL)) {
     201           0 :     log_warn(LD_OR, "Failed to queue key update for worker threads.");
     202             :   }
     203             : }
     204             : 
     205             : /** Indexed by handshake type: how many onionskins have we processed and
     206             :  * counted of that type? */
     207             : static uint64_t onionskins_n_processed[MAX_ONION_HANDSHAKE_TYPE+1];
     208             : /** Indexed by handshake type, corresponding to the onionskins counted in
     209             :  * onionskins_n_processed: how many microseconds have we spent in cpuworkers
     210             :  * processing that kind of onionskin? */
     211             : static uint64_t onionskins_usec_internal[MAX_ONION_HANDSHAKE_TYPE+1];
     212             : /** Indexed by handshake type, corresponding to onionskins counted in
     213             :  * onionskins_n_processed: how many microseconds have we spent waiting for
     214             :  * cpuworkers to give us answers for that kind of onionskin?
     215             :  */
     216             : static uint64_t onionskins_usec_roundtrip[MAX_ONION_HANDSHAKE_TYPE+1];
     217             : 
     218             : /** If any onionskin takes longer than this, we clip them to this
     219             :  * time. (microseconds) */
     220             : #define MAX_BELIEVABLE_ONIONSKIN_DELAY (2*1000*1000)
     221             : 
     222             : /** Return true iff we'd like to measure a handshake of type
     223             :  * <b>onionskin_type</b>. Call only from the main thread. */
     224             : static int
     225           0 : should_time_request(uint16_t onionskin_type)
     226             : {
     227             :   /* If we've never heard of this type, we shouldn't even be here. */
     228           0 :   if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE)
     229             :     return 0;
     230             :   /* Measure the first N handshakes of each type, to ensure we have a
     231             :    * sample */
     232           0 :   if (onionskins_n_processed[onionskin_type] < 4096)
     233             :     return 1;
     234             : 
     235             :   /** Otherwise, measure with P=1/128.  We avoid doing this for every
     236             :    * handshake, since the measurement itself can take a little time. */
     237           0 :   return crypto_fast_rng_one_in_n(get_thread_fast_rng(), 128);
     238             : }
     239             : 
     240             : /** Return an estimate of how many microseconds we will need for a single
     241             :  * cpuworker to process <b>n_requests</b> onionskins of type
     242             :  * <b>onionskin_type</b>. */
     243             : uint64_t
     244           0 : estimated_usec_for_onionskins(uint32_t n_requests, uint16_t onionskin_type)
     245             : {
     246           0 :   if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE) /* should be impossible */
     247           0 :     return 1000 * (uint64_t)n_requests;
     248           0 :   if (PREDICT_UNLIKELY(onionskins_n_processed[onionskin_type] < 100)) {
     249             :     /* Until we have 100 data points, just assume everything takes 1 msec. */
     250           0 :     return 1000 * (uint64_t)n_requests;
     251             :   } else {
     252             :     /* This can't overflow: we'll never have more than 500000 onionskins
     253             :      * measured in onionskin_usec_internal, and they won't take anything near
     254             :      * 1 sec each, and we won't have anything like 1 million queued
     255             :      * onionskins.  But that's 5e5 * 1e6 * 1e6, which is still less than
     256             :      * UINT64_MAX. */
     257           0 :     return (onionskins_usec_internal[onionskin_type] * n_requests) /
     258             :       onionskins_n_processed[onionskin_type];
     259             :   }
     260             : }
     261             : 
     262             : /** Compute the absolute and relative overhead of using the cpuworker
     263             :  * framework for onionskins of type <b>onionskin_type</b>.*/
     264             : static int
     265           0 : get_overhead_for_onionskins(uint32_t *usec_out, double *frac_out,
     266             :                             uint16_t onionskin_type)
     267             : {
     268           0 :   uint64_t overhead;
     269             : 
     270           0 :   *usec_out = 0;
     271           0 :   *frac_out = 0.0;
     272             : 
     273           0 :   if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE) /* should be impossible */
     274             :     return -1;
     275           0 :   if (onionskins_n_processed[onionskin_type] == 0 ||
     276           0 :       onionskins_usec_internal[onionskin_type] == 0 ||
     277           0 :       onionskins_usec_roundtrip[onionskin_type] == 0)
     278             :     return -1;
     279             : 
     280           0 :   overhead = onionskins_usec_roundtrip[onionskin_type] -
     281             :     onionskins_usec_internal[onionskin_type];
     282             : 
     283           0 :   *usec_out = (uint32_t)(overhead / onionskins_n_processed[onionskin_type]);
     284           0 :   *frac_out = ((double)overhead) / onionskins_usec_internal[onionskin_type];
     285             : 
     286           0 :   return 0;
     287             : }
     288             : 
     289             : /** If we've measured overhead for onionskins of type <b>onionskin_type</b>,
     290             :  * log it. */
     291             : void
     292           0 : cpuworker_log_onionskin_overhead(int severity, int onionskin_type,
     293             :                                  const char *onionskin_type_name)
     294             : {
     295           0 :   uint32_t overhead;
     296           0 :   double relative_overhead;
     297           0 :   int r;
     298             : 
     299           0 :   r = get_overhead_for_onionskins(&overhead,  &relative_overhead,
     300             :                                   onionskin_type);
     301           0 :   if (!overhead || r<0)
     302           0 :     return;
     303             : 
     304           0 :   log_fn(severity, LD_OR,
     305             :          "%s onionskins have averaged %u usec overhead (%.2f%%) in "
     306             :          "cpuworker code ",
     307             :          onionskin_type_name, (unsigned)overhead, relative_overhead*100);
     308             : }
     309             : 
     310             : /** Handle a reply from the worker threads. */
     311             : static void
     312           0 : cpuworker_onion_handshake_replyfn(void *work_)
     313             : {
     314           0 :   cpuworker_job_t *job = work_;
     315           0 :   cpuworker_reply_t rpl;
     316           0 :   or_circuit_t *circ = NULL;
     317             : 
     318           0 :   tor_assert(total_pending_tasks > 0);
     319           0 :   --total_pending_tasks;
     320             : 
     321             :   /* Could avoid this, but doesn't matter. */
     322           0 :   memcpy(&rpl, &job->u.reply, sizeof(rpl));
     323             : 
     324           0 :   tor_assert(rpl.magic == CPUWORKER_REPLY_MAGIC);
     325             : 
     326           0 :   if (rpl.timed && rpl.success &&
     327           0 :       rpl.handshake_type <= MAX_ONION_HANDSHAKE_TYPE) {
     328             :     /* Time how long this request took. The handshake_type check should be
     329             :        needless, but let's leave it in to be safe. */
     330           0 :     struct timeval tv_end, tv_diff;
     331           0 :     int64_t usec_roundtrip;
     332           0 :     tor_gettimeofday(&tv_end);
     333           0 :     timersub(&tv_end, &rpl.started_at, &tv_diff);
     334           0 :     usec_roundtrip = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec;
     335           0 :     if (usec_roundtrip >= 0 &&
     336             :         usec_roundtrip < MAX_BELIEVABLE_ONIONSKIN_DELAY) {
     337           0 :       ++onionskins_n_processed[rpl.handshake_type];
     338           0 :       onionskins_usec_internal[rpl.handshake_type] += rpl.n_usec;
     339           0 :       onionskins_usec_roundtrip[rpl.handshake_type] += usec_roundtrip;
     340           0 :       if (onionskins_n_processed[rpl.handshake_type] >= 500000) {
     341             :         /* Scale down every 500000 handshakes.  On a busy server, that's
     342             :          * less impressive than it sounds. */
     343           0 :         onionskins_n_processed[rpl.handshake_type] /= 2;
     344           0 :         onionskins_usec_internal[rpl.handshake_type] /= 2;
     345           0 :         onionskins_usec_roundtrip[rpl.handshake_type] /= 2;
     346             :       }
     347             :     }
     348             :   }
     349             : 
     350           0 :   circ = job->circ;
     351             : 
     352           0 :   log_debug(LD_OR,
     353             :             "Unpacking cpuworker reply %p, circ=%p, success=%d",
     354             :             job, circ, rpl.success);
     355             : 
     356           0 :   if (circ->base_.magic == DEAD_CIRCUIT_MAGIC) {
     357             :     /* The circuit was supposed to get freed while the reply was
     358             :      * pending. Instead, it got left for us to free so that we wouldn't freak
     359             :      * out when the job->circ field wound up pointing to nothing. */
     360           0 :     log_debug(LD_OR, "Circuit died while reply was pending. Freeing memory.");
     361           0 :     circ->base_.magic = 0;
     362           0 :     tor_free(circ);
     363           0 :     goto done_processing;
     364             :   }
     365             : 
     366           0 :   circ->workqueue_entry = NULL;
     367             : 
     368           0 :   if (TO_CIRCUIT(circ)->marked_for_close) {
     369             :     /* We already marked this circuit; we can't call it open. */
     370           0 :     log_debug(LD_OR,"circuit is already marked.");
     371           0 :     goto done_processing;
     372             :   }
     373             : 
     374           0 :   if (rpl.success == 0) {
     375           0 :     log_debug(LD_OR,
     376             :               "decoding onionskin failed. "
     377             :               "(Old key or bad software.) Closing.");
     378           0 :     circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_TORPROTOCOL);
     379           0 :     goto done_processing;
     380             :   }
     381             : 
     382           0 :   if (onionskin_answer(circ,
     383             :                        &rpl.created_cell,
     384             :                        (const char*)rpl.keys, sizeof(rpl.keys),
     385             :                        rpl.rend_auth_material) < 0) {
     386           0 :     log_warn(LD_OR,"onionskin_answer failed. Closing.");
     387           0 :     circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_INTERNAL);
     388           0 :     goto done_processing;
     389             :   }
     390           0 :   log_debug(LD_OR,"onionskin_answer succeeded. Yay.");
     391             : 
     392           0 :  done_processing:
     393           0 :   memwipe(&rpl, 0, sizeof(rpl));
     394           0 :   memwipe(job, 0, sizeof(*job));
     395           0 :   tor_free(job);
     396           0 :   queue_pending_tasks();
     397           0 : }
     398             : 
     399             : /** Implementation function for onion handshake requests. */
     400             : static workqueue_reply_t
     401           0 : cpuworker_onion_handshake_threadfn(void *state_, void *work_)
     402             : {
     403           0 :   worker_state_t *state = state_;
     404           0 :   cpuworker_job_t *job = work_;
     405             : 
     406             :   /* variables for onion processing */
     407           0 :   server_onion_keys_t *onion_keys = state->onion_keys;
     408           0 :   cpuworker_request_t req;
     409           0 :   cpuworker_reply_t rpl;
     410             : 
     411           0 :   memcpy(&req, &job->u.request, sizeof(req));
     412             : 
     413           0 :   tor_assert(req.magic == CPUWORKER_REQUEST_MAGIC);
     414           0 :   memset(&rpl, 0, sizeof(rpl));
     415             : 
     416           0 :   const create_cell_t *cc = &req.create_cell;
     417           0 :   created_cell_t *cell_out = &rpl.created_cell;
     418           0 :   struct timeval tv_start = {0,0}, tv_end;
     419           0 :   int n;
     420           0 :   rpl.timed = req.timed;
     421           0 :   rpl.started_at = req.started_at;
     422           0 :   rpl.handshake_type = cc->handshake_type;
     423           0 :   if (req.timed)
     424           0 :     tor_gettimeofday(&tv_start);
     425           0 :   n = onion_skin_server_handshake(cc->handshake_type,
     426           0 :                                   cc->onionskin, cc->handshake_len,
     427             :                                   onion_keys,
     428             :                                   cell_out->reply,
     429             :                                   rpl.keys, CPATH_KEY_MATERIAL_LEN,
     430             :                                   rpl.rend_auth_material);
     431           0 :   if (n < 0) {
     432             :     /* failure */
     433           0 :     log_debug(LD_OR,"onion_skin_server_handshake failed.");
     434           0 :     memset(&rpl, 0, sizeof(rpl));
     435           0 :     rpl.success = 0;
     436             :   } else {
     437             :     /* success */
     438           0 :     log_debug(LD_OR,"onion_skin_server_handshake succeeded.");
     439           0 :     cell_out->handshake_len = n;
     440           0 :     switch (cc->cell_type) {
     441           0 :     case CELL_CREATE:
     442           0 :       cell_out->cell_type = CELL_CREATED; break;
     443           0 :     case CELL_CREATE2:
     444           0 :       cell_out->cell_type = CELL_CREATED2; break;
     445           0 :     case CELL_CREATE_FAST:
     446           0 :       cell_out->cell_type = CELL_CREATED_FAST; break;
     447             :     default:
     448           0 :       tor_assert(0);
     449             :       return WQ_RPL_SHUTDOWN;
     450             :     }
     451           0 :     rpl.success = 1;
     452             :   }
     453           0 :   rpl.magic = CPUWORKER_REPLY_MAGIC;
     454           0 :   if (req.timed) {
     455           0 :     struct timeval tv_diff;
     456           0 :     int64_t usec;
     457           0 :     tor_gettimeofday(&tv_end);
     458           0 :     timersub(&tv_end, &tv_start, &tv_diff);
     459           0 :     usec = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec;
     460           0 :     if (usec < 0 || usec > MAX_BELIEVABLE_ONIONSKIN_DELAY)
     461           0 :       rpl.n_usec = MAX_BELIEVABLE_ONIONSKIN_DELAY;
     462             :     else
     463           0 :       rpl.n_usec = (uint32_t) usec;
     464             :   }
     465             : 
     466           0 :   memcpy(&job->u.reply, &rpl, sizeof(rpl));
     467             : 
     468           0 :   memwipe(&req, 0, sizeof(req));
     469           0 :   memwipe(&rpl, 0, sizeof(req));
     470           0 :   return WQ_RPL_REPLY;
     471             : }
     472             : 
     473             : /** Take pending tasks from the queue and assign them to cpuworkers. */
     474             : static void
     475           0 : queue_pending_tasks(void)
     476             : {
     477           0 :   or_circuit_t *circ;
     478           0 :   create_cell_t *onionskin = NULL;
     479             : 
     480           0 :   while (total_pending_tasks < max_pending_tasks) {
     481           0 :     circ = onion_next_task(&onionskin);
     482             : 
     483           0 :     if (!circ)
     484           0 :       return;
     485             : 
     486           0 :     if (assign_onionskin_to_cpuworker(circ, onionskin) < 0)
     487           0 :       log_info(LD_OR,"assign_to_cpuworker failed. Ignoring.");
     488             :   }
     489             : }
     490             : 
     491             : /** DOCDOC */
     492           0 : MOCK_IMPL(workqueue_entry_t *,
     493             : cpuworker_queue_work,(workqueue_priority_t priority,
     494             :                       workqueue_reply_t (*fn)(void *, void *),
     495             :                       void (*reply_fn)(void *),
     496             :                       void *arg))
     497             : {
     498           0 :   tor_assert(threadpool);
     499             : 
     500           0 :   return threadpool_queue_work_priority(threadpool,
     501             :                                         priority,
     502             :                                         fn,
     503             :                                         reply_fn,
     504             :                                         arg);
     505             : }
     506             : 
     507             : /** Try to tell a cpuworker to perform the public key operations necessary to
     508             :  * respond to <b>onionskin</b> for the circuit <b>circ</b>.
     509             :  *
     510             :  * Return 0 if we successfully assign the task, or -1 on failure.
     511             :  */
     512             : int
     513           0 : assign_onionskin_to_cpuworker(or_circuit_t *circ,
     514             :                               create_cell_t *onionskin)
     515             : {
     516           0 :   workqueue_entry_t *queue_entry;
     517           0 :   cpuworker_job_t *job;
     518           0 :   cpuworker_request_t req;
     519           0 :   int should_time;
     520             : 
     521           0 :   tor_assert(threadpool);
     522             : 
     523           0 :   if (!circ->p_chan) {
     524           0 :     log_info(LD_OR,"circ->p_chan gone. Failing circ.");
     525           0 :     tor_free(onionskin);
     526           0 :     return -1;
     527             :   }
     528             : 
     529           0 :   if (total_pending_tasks >= max_pending_tasks) {
     530           0 :     log_debug(LD_OR,"No idle cpuworkers. Queuing.");
     531           0 :     if (onion_pending_add(circ, onionskin) < 0) {
     532           0 :       tor_free(onionskin);
     533           0 :       return -1;
     534             :     }
     535             :     return 0;
     536             :   }
     537             : 
     538           0 :   if (!channel_is_client(circ->p_chan))
     539           0 :     rep_hist_note_circuit_handshake_assigned(onionskin->handshake_type);
     540             : 
     541           0 :   should_time = should_time_request(onionskin->handshake_type);
     542           0 :   memset(&req, 0, sizeof(req));
     543           0 :   req.magic = CPUWORKER_REQUEST_MAGIC;
     544           0 :   req.timed = should_time;
     545             : 
     546           0 :   memcpy(&req.create_cell, onionskin, sizeof(create_cell_t));
     547             : 
     548           0 :   tor_free(onionskin);
     549             : 
     550           0 :   if (should_time)
     551           0 :     tor_gettimeofday(&req.started_at);
     552             : 
     553           0 :   job = tor_malloc_zero(sizeof(cpuworker_job_t));
     554           0 :   job->circ = circ;
     555           0 :   memcpy(&job->u.request, &req, sizeof(req));
     556           0 :   memwipe(&req, 0, sizeof(req));
     557             : 
     558           0 :   ++total_pending_tasks;
     559           0 :   queue_entry = threadpool_queue_work_priority(threadpool,
     560             :                                       WQ_PRI_HIGH,
     561             :                                       cpuworker_onion_handshake_threadfn,
     562             :                                       cpuworker_onion_handshake_replyfn,
     563             :                                       job);
     564           0 :   if (!queue_entry) {
     565           0 :     log_warn(LD_BUG, "Couldn't queue work on threadpool");
     566           0 :     tor_free(job);
     567           0 :     return -1;
     568             :   }
     569             : 
     570           0 :   log_debug(LD_OR, "Queued task %p (qe=%p, circ=%p)",
     571             :             job, queue_entry, job->circ);
     572             : 
     573           0 :   circ->workqueue_entry = queue_entry;
     574             : 
     575           0 :   return 0;
     576             : }
     577             : 
     578             : /** If <b>circ</b> has a pending handshake that hasn't been processed yet,
     579             :  * remove it from the worker queue. */
     580             : void
     581           0 : cpuworker_cancel_circ_handshake(or_circuit_t *circ)
     582             : {
     583           0 :   cpuworker_job_t *job;
     584           0 :   if (circ->workqueue_entry == NULL)
     585           0 :     return;
     586             : 
     587           0 :   job = workqueue_entry_cancel(circ->workqueue_entry);
     588           0 :   if (job) {
     589             :     /* It successfully cancelled. */
     590           0 :     memwipe(job, 0xe0, sizeof(*job));
     591           0 :     tor_free(job);
     592           0 :     tor_assert(total_pending_tasks > 0);
     593           0 :     --total_pending_tasks;
     594             :     /* if (!job), this is done in cpuworker_onion_handshake_replyfn. */
     595           0 :     circ->workqueue_entry = NULL;
     596             :   }
     597             : }

Generated by: LCOV version 1.14