tor  0.4.2.0-alpha-dev
cpuworker.c
Go to the documentation of this file.
1 /* Copyright (c) 2003-2004, Roger Dingledine.
2  * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
3  * Copyright (c) 2007-2019, The Tor Project, Inc. */
4 /* See LICENSE for licensing information */
5 
20 #include "core/or/or.h"
21 #include "core/or/channel.h"
22 #include "core/or/circuitbuild.h"
23 #include "core/or/circuitlist.h"
24 #include "core/or/connection_or.h"
25 #include "app/config/config.h"
29 #include "core/or/onion.h"
31 #include "feature/stats/rephist.h"
32 #include "feature/relay/router.h"
33 #include "lib/evloop/workqueue.h"
35 
36 #include "core/or/or_circuit_st.h"
37 
38 static void queue_pending_tasks(void);
39 
40 typedef struct worker_state_s {
41  int generation;
42  server_onion_keys_t *onion_keys;
44 
45 static void *
46 worker_state_new(void *arg)
47 {
48  worker_state_t *ws;
49  (void)arg;
50  ws = tor_malloc_zero(sizeof(worker_state_t));
51  ws->onion_keys = server_onion_keys_new();
52  return ws;
53 }
54 
55 #define worker_state_free(ws) \
56  FREE_AND_NULL(worker_state_t, worker_state_free_, (ws))
57 
58 static void
59 worker_state_free_(worker_state_t *ws)
60 {
61  if (!ws)
62  return;
63  server_onion_keys_free(ws->onion_keys);
64  tor_free(ws);
65 }
66 
67 static void
68 worker_state_free_void(void *arg)
69 {
70  worker_state_free_(arg);
71 }
72 
73 static replyqueue_t *replyqueue = NULL;
74 static threadpool_t *threadpool = NULL;
75 
76 static int total_pending_tasks = 0;
77 static int max_pending_tasks = 128;
78 
82 void
83 cpu_init(void)
84 {
85  if (!replyqueue) {
86  replyqueue = replyqueue_new(0);
87  }
88  if (!threadpool) {
89  /*
90  In our threadpool implementation, half the threads are permissive and
91  half are strict (when it comes to running lower-priority tasks). So we
92  always make sure we have at least two threads, so that there will be at
93  least one thread of each kind.
94  */
95  const int n_threads = get_num_cpus(get_options()) + 1;
96  threadpool = threadpool_new(n_threads,
97  replyqueue,
98  worker_state_new,
99  worker_state_free_void,
100  NULL);
101 
102  int r = threadpool_register_reply_event(threadpool, NULL);
103 
104  tor_assert(r == 0);
105  }
106 
107  /* Total voodoo. Can we make this more sensible? */
108  max_pending_tasks = get_num_cpus(get_options()) * 64;
109 }
110 
113 #define CPUWORKER_REQUEST_MAGIC 0xda4afeed
114 #define CPUWORKER_REPLY_MAGIC 0x5eedf00d
115 
117 typedef struct cpuworker_request_t {
119  uint32_t magic;
120 
122  unsigned timed : 1;
125 
128 
129  /* Turn the above into a tagged union if needed. */
131 
133 typedef struct cpuworker_reply_t {
135  uint32_t magic;
136 
138  uint8_t success;
139 
141  unsigned int timed : 1;
143  uint16_t handshake_type;
149  uint32_t n_usec;
150 
158  uint8_t keys[CPATH_KEY_MATERIAL_LEN];
162 
163 typedef struct cpuworker_job_u {
164  or_circuit_t *circ;
165  union {
166  cpuworker_request_t request;
167  cpuworker_reply_t reply;
168  } u;
170 
171 static workqueue_reply_t
172 update_state_threadfn(void *state_, void *work_)
173 {
174  worker_state_t *state = state_;
175  worker_state_t *update = work_;
176  server_onion_keys_free(state->onion_keys);
177  state->onion_keys = update->onion_keys;
178  update->onion_keys = NULL;
179  worker_state_free(update);
180  ++state->generation;
181  return WQ_RPL_REPLY;
182 }
183 
187 void
189 {
190  if (!threadpool) {
191  /* If we're a client, then we won't have cpuworkers, and we won't need
192  * to tell them to rotate their state.
193  */
194  return;
195  }
196  if (threadpool_queue_update(threadpool,
197  worker_state_new,
198  update_state_threadfn,
199  worker_state_free_void,
200  NULL)) {
201  log_warn(LD_OR, "Failed to queue key update for worker threads.");
202  }
203 }
204 
207 static uint64_t onionskins_n_processed[MAX_ONION_HANDSHAKE_TYPE+1];
211 static uint64_t onionskins_usec_internal[MAX_ONION_HANDSHAKE_TYPE+1];
216 static uint64_t onionskins_usec_roundtrip[MAX_ONION_HANDSHAKE_TYPE+1];
217 
220 #define MAX_BELIEVABLE_ONIONSKIN_DELAY (2*1000*1000)
221 
224 static int
225 should_time_request(uint16_t onionskin_type)
226 {
227  /* If we've never heard of this type, we shouldn't even be here. */
228  if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE)
229  return 0;
230  /* Measure the first N handshakes of each type, to ensure we have a
231  * sample */
232  if (onionskins_n_processed[onionskin_type] < 4096)
233  return 1;
234 
238 }
239 
243 uint64_t
244 estimated_usec_for_onionskins(uint32_t n_requests, uint16_t onionskin_type)
245 {
246  if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE) /* should be impossible */
247  return 1000 * (uint64_t)n_requests;
248  if (PREDICT_UNLIKELY(onionskins_n_processed[onionskin_type] < 100)) {
249  /* Until we have 100 data points, just asssume everything takes 1 msec. */
250  return 1000 * (uint64_t)n_requests;
251  } else {
252  /* This can't overflow: we'll never have more than 500000 onionskins
253  * measured in onionskin_usec_internal, and they won't take anything near
254  * 1 sec each, and we won't have anything like 1 million queued
255  * onionskins. But that's 5e5 * 1e6 * 1e6, which is still less than
256  * UINT64_MAX. */
257  return (onionskins_usec_internal[onionskin_type] * n_requests) /
258  onionskins_n_processed[onionskin_type];
259  }
260 }
261 
264 static int
265 get_overhead_for_onionskins(uint32_t *usec_out, double *frac_out,
266  uint16_t onionskin_type)
267 {
268  uint64_t overhead;
269 
270  *usec_out = 0;
271  *frac_out = 0.0;
272 
273  if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE) /* should be impossible */
274  return -1;
275  if (onionskins_n_processed[onionskin_type] == 0 ||
276  onionskins_usec_internal[onionskin_type] == 0 ||
277  onionskins_usec_roundtrip[onionskin_type] == 0)
278  return -1;
279 
280  overhead = onionskins_usec_roundtrip[onionskin_type] -
281  onionskins_usec_internal[onionskin_type];
282 
283  *usec_out = (uint32_t)(overhead / onionskins_n_processed[onionskin_type]);
284  *frac_out = ((double)overhead) / onionskins_usec_internal[onionskin_type];
285 
286  return 0;
287 }
288 
291 void
292 cpuworker_log_onionskin_overhead(int severity, int onionskin_type,
293  const char *onionskin_type_name)
294 {
295  uint32_t overhead;
296  double relative_overhead;
297  int r;
298 
299  r = get_overhead_for_onionskins(&overhead, &relative_overhead,
300  onionskin_type);
301  if (!overhead || r<0)
302  return;
303 
304  log_fn(severity, LD_OR,
305  "%s onionskins have averaged %u usec overhead (%.2f%%) in "
306  "cpuworker code ",
307  onionskin_type_name, (unsigned)overhead, relative_overhead*100);
308 }
309 
311 static void
313 {
314  cpuworker_job_t *job = work_;
315  cpuworker_reply_t rpl;
316  or_circuit_t *circ = NULL;
317 
318  tor_assert(total_pending_tasks > 0);
319  --total_pending_tasks;
320 
321  /* Could avoid this, but doesn't matter. */
322  memcpy(&rpl, &job->u.reply, sizeof(rpl));
323 
324  tor_assert(rpl.magic == CPUWORKER_REPLY_MAGIC);
325 
326  if (rpl.timed && rpl.success &&
327  rpl.handshake_type <= MAX_ONION_HANDSHAKE_TYPE) {
328  /* Time how long this request took. The handshake_type check should be
329  needless, but let's leave it in to be safe. */
330  struct timeval tv_end, tv_diff;
331  int64_t usec_roundtrip;
332  tor_gettimeofday(&tv_end);
333  timersub(&tv_end, &rpl.started_at, &tv_diff);
334  usec_roundtrip = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec;
335  if (usec_roundtrip >= 0 &&
336  usec_roundtrip < MAX_BELIEVABLE_ONIONSKIN_DELAY) {
339  onionskins_usec_roundtrip[rpl.handshake_type] += usec_roundtrip;
340  if (onionskins_n_processed[rpl.handshake_type] >= 500000) {
341  /* Scale down every 500000 handshakes. On a busy server, that's
342  * less impressive than it sounds. */
346  }
347  }
348  }
349 
350  circ = job->circ;
351 
352  log_debug(LD_OR,
353  "Unpacking cpuworker reply %p, circ=%p, success=%d",
354  job, circ, rpl.success);
355 
356  if (circ->base_.magic == DEAD_CIRCUIT_MAGIC) {
357  /* The circuit was supposed to get freed while the reply was
358  * pending. Instead, it got left for us to free so that we wouldn't freak
359  * out when the job->circ field wound up pointing to nothing. */
360  log_debug(LD_OR, "Circuit died while reply was pending. Freeing memory.");
361  circ->base_.magic = 0;
362  tor_free(circ);
363  goto done_processing;
364  }
365 
366  circ->workqueue_entry = NULL;
367 
368  if (TO_CIRCUIT(circ)->marked_for_close) {
369  /* We already marked this circuit; we can't call it open. */
370  log_debug(LD_OR,"circuit is already marked.");
371  goto done_processing;
372  }
373 
374  if (rpl.success == 0) {
375  log_debug(LD_OR,
376  "decoding onionskin failed. "
377  "(Old key or bad software.) Closing.");
378  circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_TORPROTOCOL);
379  goto done_processing;
380  }
381 
382  if (onionskin_answer(circ,
383  &rpl.created_cell,
384  (const char*)rpl.keys, sizeof(rpl.keys),
385  rpl.rend_auth_material) < 0) {
386  log_warn(LD_OR,"onionskin_answer failed. Closing.");
387  circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_INTERNAL);
388  goto done_processing;
389  }
390  log_debug(LD_OR,"onionskin_answer succeeded. Yay.");
391 
392  done_processing:
393  memwipe(&rpl, 0, sizeof(rpl));
394  memwipe(job, 0, sizeof(*job));
395  tor_free(job);
397 }
398 
400 static workqueue_reply_t
401 cpuworker_onion_handshake_threadfn(void *state_, void *work_)
402 {
403  worker_state_t *state = state_;
404  cpuworker_job_t *job = work_;
405 
406  /* variables for onion processing */
407  server_onion_keys_t *onion_keys = state->onion_keys;
409  cpuworker_reply_t rpl;
410 
411  memcpy(&req, &job->u.request, sizeof(req));
412 
414  memset(&rpl, 0, sizeof(rpl));
415 
416  const create_cell_t *cc = &req.create_cell;
417  created_cell_t *cell_out = &rpl.created_cell;
418  struct timeval tv_start = {0,0}, tv_end;
419  int n;
420  rpl.timed = req.timed;
421  rpl.started_at = req.started_at;
422  rpl.handshake_type = cc->handshake_type;
423  if (req.timed)
424  tor_gettimeofday(&tv_start);
426  cc->onionskin, cc->handshake_len,
427  onion_keys,
428  cell_out->reply,
429  rpl.keys, CPATH_KEY_MATERIAL_LEN,
430  rpl.rend_auth_material);
431  if (n < 0) {
432  /* failure */
433  log_debug(LD_OR,"onion_skin_server_handshake failed.");
434  memset(&rpl, 0, sizeof(rpl));
435  rpl.success = 0;
436  } else {
437  /* success */
438  log_debug(LD_OR,"onion_skin_server_handshake succeeded.");
439  cell_out->handshake_len = n;
440  switch (cc->cell_type) {
441  case CELL_CREATE:
442  cell_out->cell_type = CELL_CREATED; break;
443  case CELL_CREATE2:
444  cell_out->cell_type = CELL_CREATED2; break;
445  case CELL_CREATE_FAST:
446  cell_out->cell_type = CELL_CREATED_FAST; break;
447  default:
448  tor_assert(0);
449  return WQ_RPL_SHUTDOWN;
450  }
451  rpl.success = 1;
452  }
453  rpl.magic = CPUWORKER_REPLY_MAGIC;
454  if (req.timed) {
455  struct timeval tv_diff;
456  int64_t usec;
457  tor_gettimeofday(&tv_end);
458  timersub(&tv_end, &tv_start, &tv_diff);
459  usec = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec;
460  if (usec < 0 || usec > MAX_BELIEVABLE_ONIONSKIN_DELAY)
462  else
463  rpl.n_usec = (uint32_t) usec;
464  }
465 
466  memcpy(&job->u.reply, &rpl, sizeof(rpl));
467 
468  memwipe(&req, 0, sizeof(req));
469  memwipe(&rpl, 0, sizeof(req));
470  return WQ_RPL_REPLY;
471 }
472 
474 static void
476 {
477  or_circuit_t *circ;
478  create_cell_t *onionskin = NULL;
479 
480  while (total_pending_tasks < max_pending_tasks) {
481  circ = onion_next_task(&onionskin);
482 
483  if (!circ)
484  return;
485 
486  if (assign_onionskin_to_cpuworker(circ, onionskin) < 0)
487  log_info(LD_OR,"assign_to_cpuworker failed. Ignoring.");
488  }
489 }
490 
493 cpuworker_queue_work,(workqueue_priority_t priority,
494  workqueue_reply_t (*fn)(void *, void *),
495  void (*reply_fn)(void *),
496  void *arg))
497 {
498  tor_assert(threadpool);
499 
500  return threadpool_queue_work_priority(threadpool,
501  priority,
502  fn,
503  reply_fn,
504  arg);
505 }
506 
512 int
514  create_cell_t *onionskin)
515 {
516  workqueue_entry_t *queue_entry;
517  cpuworker_job_t *job;
519  int should_time;
520 
521  tor_assert(threadpool);
522 
523  if (!circ->p_chan) {
524  log_info(LD_OR,"circ->p_chan gone. Failing circ.");
525  tor_free(onionskin);
526  return -1;
527  }
528 
529  if (total_pending_tasks >= max_pending_tasks) {
530  log_debug(LD_OR,"No idle cpuworkers. Queuing.");
531  if (onion_pending_add(circ, onionskin) < 0) {
532  tor_free(onionskin);
533  return -1;
534  }
535  return 0;
536  }
537 
538  if (!channel_is_client(circ->p_chan))
540 
541  should_time = should_time_request(onionskin->handshake_type);
542  memset(&req, 0, sizeof(req));
544  req.timed = should_time;
545 
546  memcpy(&req.create_cell, onionskin, sizeof(create_cell_t));
547 
548  tor_free(onionskin);
549 
550  if (should_time)
551  tor_gettimeofday(&req.started_at);
552 
553  job = tor_malloc_zero(sizeof(cpuworker_job_t));
554  job->circ = circ;
555  memcpy(&job->u.request, &req, sizeof(req));
556  memwipe(&req, 0, sizeof(req));
557 
558  ++total_pending_tasks;
559  queue_entry = threadpool_queue_work_priority(threadpool,
560  WQ_PRI_HIGH,
563  job);
564  if (!queue_entry) {
565  log_warn(LD_BUG, "Couldn't queue work on threadpool");
566  tor_free(job);
567  return -1;
568  }
569 
570  log_debug(LD_OR, "Queued task %p (qe=%p, circ=%p)",
571  job, queue_entry, job->circ);
572 
573  circ->workqueue_entry = queue_entry;
574 
575  return 0;
576 }
577 
580 void
582 {
583  cpuworker_job_t *job;
584  if (circ->workqueue_entry == NULL)
585  return;
586 
588  if (job) {
589  /* It successfully cancelled. */
590  memwipe(job, 0xe0, sizeof(*job));
591  tor_free(job);
592  tor_assert(total_pending_tasks > 0);
593  --total_pending_tasks;
594  /* if (!job), this is done in cpuworker_onion_handshake_replyfn. */
595  circ->workqueue_entry = NULL;
596  }
597 }
int channel_is_client(const channel_t *chan)
Definition: channel.c:2924
static uint64_t onionskins_n_processed[MAX_ONION_HANDSHAKE_TYPE+1]
Definition: cpuworker.c:207
uint32_t magic
Definition: cpuworker.c:135
uint8_t onionskin[CELL_PAYLOAD_SIZE - 4]
Definition: onion.h:32
Common functions for using (pseudo-)random number generators.
unsigned int timed
Definition: cpuworker.c:141
static workqueue_reply_t cpuworker_onion_handshake_threadfn(void *state_, void *work_)
Definition: cpuworker.c:401
uint8_t cell_type
Definition: onion.h:38
uint16_t handshake_len
Definition: onion.h:30
uint16_t handshake_type
Definition: cpuworker.c:143
threadpool_t * threadpool_new(int n_threads, replyqueue_t *replyqueue, void *(*new_thread_state_fn)(void *), void(*free_thread_state_fn)(void *), void *arg)
Definition: workqueue.c:541
struct timeval started_at
Definition: cpuworker.c:124
workqueue_reply_t
Definition: workqueue.h:24
static int should_time_request(uint16_t onionskin_type)
Definition: cpuworker.c:225
struct cpuworker_request_t cpuworker_request_t
server_onion_keys_t * server_onion_keys_new(void)
Definition: onion_crypto.c:51
workqueue_priority_t
Definition: workqueue.h:31
struct timeval started_at
Definition: cpuworker.c:145
static void cpuworker_onion_handshake_replyfn(void *work_)
Definition: cpuworker.c:312
#define TO_CIRCUIT(x)
Definition: or.h:947
Header file for config.c.
uint8_t rend_auth_material[DIGEST_LEN]
Definition: cpuworker.c:160
Header file for cpuworker.c.
Header file for onion.c.
#define crypto_fast_rng_one_in_n(rng, n)
Definition: crypto_rand.h:80
#define tor_free(p)
Definition: malloc.h:52
struct cpuworker_reply_t cpuworker_reply_t
void cpuworker_cancel_circ_handshake(or_circuit_t *circ)
Definition: cpuworker.c:581
void memwipe(void *mem, uint8_t byte, size_t sz)
Definition: crypto_util.c:57
int threadpool_register_reply_event(threadpool_t *tp, void(*cb)(threadpool_t *tp))
Definition: workqueue.c:623
channel_t * p_chan
Definition: or_circuit_st.h:37
created_cell_t created_cell
Definition: cpuworker.c:156
Common functions for cryptographic routines.
Header file for channel.c.
tor_assert(buffer)
static uint64_t onionskins_usec_internal[MAX_ONION_HANDSHAKE_TYPE+1]
Definition: cpuworker.c:211
struct workqueue_entry_s * workqueue_entry
Definition: or_circuit_st.h:30
void cpu_init(void)
Definition: cpuworker.c:83
int threadpool_queue_update(threadpool_t *pool, void *(*dup_fn)(void *), workqueue_reply_t(*fn)(void *, void *), void(*free_fn)(void *), void *arg)
Definition: workqueue.c:437
#define DIGEST_LEN
Definition: digest_sizes.h:20
Master header file for Tor-specific functionality.
uint8_t reply[CELL_PAYLOAD_SIZE - 2]
Definition: onion.h:42
void rep_hist_note_circuit_handshake_assigned(uint16_t type)
Definition: rephist.c:2452
Header file for onion_queue.c.
Header file for circuitbuild.c.
Header file for rephist.c.
uint8_t cell_type
Definition: onion.h:26
static void queue_pending_tasks(void)
Definition: cpuworker.c:475
int onionskin_answer(or_circuit_t *circ, const created_cell_t *created_cell, const char *keys, size_t keys_len, const uint8_t *rend_circ_nonce)
MOCK_IMPL(workqueue_entry_t *, cpuworker_queue_work,(workqueue_priority_t priority, workqueue_reply_t(*fn)(void *, void *), void(*reply_fn)(void *), void *arg))
Definition: cpuworker.c:492
Header file for circuitlist.c.
void cpuworkers_rotate_keyinfo(void)
Definition: cpuworker.c:188
#define LD_OR
Definition: log.h:90
create_cell_t create_cell
Definition: cpuworker.c:127
crypto_fast_rng_t * get_thread_fast_rng(void)
Definition: workqueue.c:95
uint32_t n_usec
Definition: cpuworker.c:149
#define MAX_BELIEVABLE_ONIONSKIN_DELAY
Definition: cpuworker.c:220
uint8_t keys[CPATH_KEY_MATERIAL_LEN]
Definition: cpuworker.c:158
Header file for router.c.
uint64_t estimated_usec_for_onionskins(uint32_t n_requests, uint16_t onionskin_type)
Definition: cpuworker.c:244
uint16_t handshake_type
Definition: onion.h:28
int onion_pending_add(or_circuit_t *circ, create_cell_t *onionskin)
Definition: onion_queue.c:140
#define log_fn(severity, domain, args,...)
Definition: log.h:273
void cpuworker_log_onionskin_overhead(int severity, int onionskin_type, const char *onionskin_type_name)
Definition: cpuworker.c:292
or_circuit_t * onion_next_task(create_cell_t **onionskin_out)
Definition: onion_queue.c:267
int onion_skin_server_handshake(int type, const uint8_t *onion_skin, size_t onionskin_len, const server_onion_keys_t *keys, uint8_t *reply_out, uint8_t *keys_out, size_t keys_out_len, uint8_t *rend_nonce_out)
Definition: onion_crypto.c:174
uint32_t magic
Definition: circuit_st.h:54
workqueue_entry_t * threadpool_queue_work_priority(threadpool_t *pool, workqueue_priority_t prio, workqueue_reply_t(*fn)(void *, void *), void(*reply_fn)(void *), void *arg)
Definition: workqueue.c:386
static int get_overhead_for_onionskins(uint32_t *usec_out, double *frac_out, uint16_t onionskin_type)
Definition: cpuworker.c:265
Header file for connection_or.c.
#define timersub(tv1, tv2, tvout)
Definition: timeval.h:61
uint16_t handshake_len
Definition: onion.h:40
static uint64_t onionskins_usec_roundtrip[MAX_ONION_HANDSHAKE_TYPE+1]
Definition: cpuworker.c:216
Header for workqueue.c.
int get_num_cpus(const or_options_t *options)
Definition: config.c:8021
#define CPUWORKER_REQUEST_MAGIC
Definition: cpuworker.c:113
replyqueue_t * replyqueue_new(uint32_t alertsocks_flags)
Definition: workqueue.c:586
#define LD_BUG
Definition: log.h:84
void * workqueue_entry_cancel(workqueue_entry_t *ent)
Definition: workqueue.c:191
Header file for onion_crypto.c.
int assign_onionskin_to_cpuworker(or_circuit_t *circ, create_cell_t *onionskin)
Definition: cpuworker.c:513