tor  0.4.0.0-alpha-dev
cpuworker.c
Go to the documentation of this file.
1 /* Copyright (c) 2003-2004, Roger Dingledine.
2  * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
3  * Copyright (c) 2007-2018, The Tor Project, Inc. */
4 /* See LICENSE for licensing information */
5 
20 #include "core/or/or.h"
21 #include "core/or/channel.h"
22 #include "core/or/circuitbuild.h"
23 #include "core/or/circuitlist.h"
24 #include "core/or/connection_or.h"
25 #include "app/config/config.h"
29 #include "core/or/onion.h"
31 #include "feature/stats/rephist.h"
32 #include "feature/relay/router.h"
33 #include "lib/evloop/workqueue.h"
35 
36 #include "core/or/or_circuit_st.h"
37 #include "lib/intmath/weakrng.h"
38 
39 static void queue_pending_tasks(void);
40 
41 typedef struct worker_state_s {
42  int generation;
43  server_onion_keys_t *onion_keys;
45 
46 static void *
47 worker_state_new(void *arg)
48 {
49  worker_state_t *ws;
50  (void)arg;
51  ws = tor_malloc_zero(sizeof(worker_state_t));
52  ws->onion_keys = server_onion_keys_new();
53  return ws;
54 }
55 
56 #define worker_state_free(ws) \
57  FREE_AND_NULL(worker_state_t, worker_state_free_, (ws))
58 
59 static void
60 worker_state_free_(worker_state_t *ws)
61 {
62  if (!ws)
63  return;
64  server_onion_keys_free(ws->onion_keys);
65  tor_free(ws);
66 }
67 
68 static void
69 worker_state_free_void(void *arg)
70 {
71  worker_state_free_(arg);
72 }
73 
74 static replyqueue_t *replyqueue = NULL;
75 static threadpool_t *threadpool = NULL;
76 
77 static tor_weak_rng_t request_sample_rng = TOR_WEAK_RNG_INIT;
78 
79 static int total_pending_tasks = 0;
80 static int max_pending_tasks = 128;
81 
85 void
86 cpu_init(void)
87 {
88  if (!replyqueue) {
89  replyqueue = replyqueue_new(0);
90  }
91  if (!threadpool) {
92  /*
93  In our threadpool implementation, half the threads are permissive and
94  half are strict (when it comes to running lower-priority tasks). So we
95  always make sure we have at least two threads, so that there will be at
96  least one thread of each kind.
97  */
98  const int n_threads = get_num_cpus(get_options()) + 1;
99  threadpool = threadpool_new(n_threads,
100  replyqueue,
101  worker_state_new,
102  worker_state_free_void,
103  NULL);
104 
105  int r = threadpool_register_reply_event(threadpool, NULL);
106 
107  tor_assert(r == 0);
108  }
109 
110  /* Total voodoo. Can we make this more sensible? */
111  max_pending_tasks = get_num_cpus(get_options()) * 64;
112  crypto_seed_weak_rng(&request_sample_rng);
113 }
114 
117 #define CPUWORKER_REQUEST_MAGIC 0xda4afeed
118 #define CPUWORKER_REPLY_MAGIC 0x5eedf00d
119 
121 typedef struct cpuworker_request_t {
123  uint32_t magic;
124 
126  unsigned timed : 1;
129 
132 
133  /* Turn the above into a tagged union if needed. */
135 
137 typedef struct cpuworker_reply_t {
139  uint32_t magic;
140 
142  uint8_t success;
143 
145  unsigned int timed : 1;
147  uint16_t handshake_type;
153  uint32_t n_usec;
154 
162  uint8_t keys[CPATH_KEY_MATERIAL_LEN];
166 
167 typedef struct cpuworker_job_u {
168  or_circuit_t *circ;
169  union {
170  cpuworker_request_t request;
171  cpuworker_reply_t reply;
172  } u;
174 
175 static workqueue_reply_t
176 update_state_threadfn(void *state_, void *work_)
177 {
178  worker_state_t *state = state_;
179  worker_state_t *update = work_;
180  server_onion_keys_free(state->onion_keys);
181  state->onion_keys = update->onion_keys;
182  update->onion_keys = NULL;
183  worker_state_free(update);
184  ++state->generation;
185  return WQ_RPL_REPLY;
186 }
187 
191 void
193 {
194  if (!threadpool) {
195  /* If we're a client, then we won't have cpuworkers, and we won't need
196  * to tell them to rotate their state.
197  */
198  return;
199  }
200  if (threadpool_queue_update(threadpool,
201  worker_state_new,
202  update_state_threadfn,
203  worker_state_free_void,
204  NULL)) {
205  log_warn(LD_OR, "Failed to queue key update for worker threads.");
206  }
207 }
208 
211 static uint64_t onionskins_n_processed[MAX_ONION_HANDSHAKE_TYPE+1];
215 static uint64_t onionskins_usec_internal[MAX_ONION_HANDSHAKE_TYPE+1];
220 static uint64_t onionskins_usec_roundtrip[MAX_ONION_HANDSHAKE_TYPE+1];
221 
224 #define MAX_BELIEVABLE_ONIONSKIN_DELAY (2*1000*1000)
225 
228 static int
229 should_time_request(uint16_t onionskin_type)
230 {
231  /* If we've never heard of this type, we shouldn't even be here. */
232  if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE)
233  return 0;
234  /* Measure the first N handshakes of each type, to ensure we have a
235  * sample */
236  if (onionskins_n_processed[onionskin_type] < 4096)
237  return 1;
240  return tor_weak_random_one_in_n(&request_sample_rng, 128);
241 }
242 
246 uint64_t
247 estimated_usec_for_onionskins(uint32_t n_requests, uint16_t onionskin_type)
248 {
249  if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE) /* should be impossible */
250  return 1000 * (uint64_t)n_requests;
251  if (PREDICT_UNLIKELY(onionskins_n_processed[onionskin_type] < 100)) {
252  /* Until we have 100 data points, just asssume everything takes 1 msec. */
253  return 1000 * (uint64_t)n_requests;
254  } else {
255  /* This can't overflow: we'll never have more than 500000 onionskins
256  * measured in onionskin_usec_internal, and they won't take anything near
257  * 1 sec each, and we won't have anything like 1 million queued
258  * onionskins. But that's 5e5 * 1e6 * 1e6, which is still less than
259  * UINT64_MAX. */
260  return (onionskins_usec_internal[onionskin_type] * n_requests) /
261  onionskins_n_processed[onionskin_type];
262  }
263 }
264 
267 static int
268 get_overhead_for_onionskins(uint32_t *usec_out, double *frac_out,
269  uint16_t onionskin_type)
270 {
271  uint64_t overhead;
272 
273  *usec_out = 0;
274  *frac_out = 0.0;
275 
276  if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE) /* should be impossible */
277  return -1;
278  if (onionskins_n_processed[onionskin_type] == 0 ||
279  onionskins_usec_internal[onionskin_type] == 0 ||
280  onionskins_usec_roundtrip[onionskin_type] == 0)
281  return -1;
282 
283  overhead = onionskins_usec_roundtrip[onionskin_type] -
284  onionskins_usec_internal[onionskin_type];
285 
286  *usec_out = (uint32_t)(overhead / onionskins_n_processed[onionskin_type]);
287  *frac_out = ((double)overhead) / onionskins_usec_internal[onionskin_type];
288 
289  return 0;
290 }
291 
294 void
295 cpuworker_log_onionskin_overhead(int severity, int onionskin_type,
296  const char *onionskin_type_name)
297 {
298  uint32_t overhead;
299  double relative_overhead;
300  int r;
301 
302  r = get_overhead_for_onionskins(&overhead, &relative_overhead,
303  onionskin_type);
304  if (!overhead || r<0)
305  return;
306 
307  log_fn(severity, LD_OR,
308  "%s onionskins have averaged %u usec overhead (%.2f%%) in "
309  "cpuworker code ",
310  onionskin_type_name, (unsigned)overhead, relative_overhead*100);
311 }
312 
314 static void
316 {
317  cpuworker_job_t *job = work_;
318  cpuworker_reply_t rpl;
319  or_circuit_t *circ = NULL;
320 
321  tor_assert(total_pending_tasks > 0);
322  --total_pending_tasks;
323 
324  /* Could avoid this, but doesn't matter. */
325  memcpy(&rpl, &job->u.reply, sizeof(rpl));
326 
327  tor_assert(rpl.magic == CPUWORKER_REPLY_MAGIC);
328 
329  if (rpl.timed && rpl.success &&
330  rpl.handshake_type <= MAX_ONION_HANDSHAKE_TYPE) {
331  /* Time how long this request took. The handshake_type check should be
332  needless, but let's leave it in to be safe. */
333  struct timeval tv_end, tv_diff;
334  int64_t usec_roundtrip;
335  tor_gettimeofday(&tv_end);
336  timersub(&tv_end, &rpl.started_at, &tv_diff);
337  usec_roundtrip = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec;
338  if (usec_roundtrip >= 0 &&
339  usec_roundtrip < MAX_BELIEVABLE_ONIONSKIN_DELAY) {
342  onionskins_usec_roundtrip[rpl.handshake_type] += usec_roundtrip;
343  if (onionskins_n_processed[rpl.handshake_type] >= 500000) {
344  /* Scale down every 500000 handshakes. On a busy server, that's
345  * less impressive than it sounds. */
349  }
350  }
351  }
352 
353  circ = job->circ;
354 
355  log_debug(LD_OR,
356  "Unpacking cpuworker reply %p, circ=%p, success=%d",
357  job, circ, rpl.success);
358 
359  if (circ->base_.magic == DEAD_CIRCUIT_MAGIC) {
360  /* The circuit was supposed to get freed while the reply was
361  * pending. Instead, it got left for us to free so that we wouldn't freak
362  * out when the job->circ field wound up pointing to nothing. */
363  log_debug(LD_OR, "Circuit died while reply was pending. Freeing memory.");
364  circ->base_.magic = 0;
365  tor_free(circ);
366  goto done_processing;
367  }
368 
369  circ->workqueue_entry = NULL;
370 
371  if (TO_CIRCUIT(circ)->marked_for_close) {
372  /* We already marked this circuit; we can't call it open. */
373  log_debug(LD_OR,"circuit is already marked.");
374  goto done_processing;
375  }
376 
377  if (rpl.success == 0) {
378  log_debug(LD_OR,
379  "decoding onionskin failed. "
380  "(Old key or bad software.) Closing.");
381  circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_TORPROTOCOL);
382  goto done_processing;
383  }
384 
385  if (onionskin_answer(circ,
386  &rpl.created_cell,
387  (const char*)rpl.keys, sizeof(rpl.keys),
388  rpl.rend_auth_material) < 0) {
389  log_warn(LD_OR,"onionskin_answer failed. Closing.");
390  circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_INTERNAL);
391  goto done_processing;
392  }
393  log_debug(LD_OR,"onionskin_answer succeeded. Yay.");
394 
395  done_processing:
396  memwipe(&rpl, 0, sizeof(rpl));
397  memwipe(job, 0, sizeof(*job));
398  tor_free(job);
400 }
401 
403 static workqueue_reply_t
404 cpuworker_onion_handshake_threadfn(void *state_, void *work_)
405 {
406  worker_state_t *state = state_;
407  cpuworker_job_t *job = work_;
408 
409  /* variables for onion processing */
410  server_onion_keys_t *onion_keys = state->onion_keys;
412  cpuworker_reply_t rpl;
413 
414  memcpy(&req, &job->u.request, sizeof(req));
415 
417  memset(&rpl, 0, sizeof(rpl));
418 
419  const create_cell_t *cc = &req.create_cell;
420  created_cell_t *cell_out = &rpl.created_cell;
421  struct timeval tv_start = {0,0}, tv_end;
422  int n;
423  rpl.timed = req.timed;
424  rpl.started_at = req.started_at;
425  rpl.handshake_type = cc->handshake_type;
426  if (req.timed)
427  tor_gettimeofday(&tv_start);
429  cc->onionskin, cc->handshake_len,
430  onion_keys,
431  cell_out->reply,
432  rpl.keys, CPATH_KEY_MATERIAL_LEN,
433  rpl.rend_auth_material);
434  if (n < 0) {
435  /* failure */
436  log_debug(LD_OR,"onion_skin_server_handshake failed.");
437  memset(&rpl, 0, sizeof(rpl));
438  rpl.success = 0;
439  } else {
440  /* success */
441  log_debug(LD_OR,"onion_skin_server_handshake succeeded.");
442  cell_out->handshake_len = n;
443  switch (cc->cell_type) {
444  case CELL_CREATE:
445  cell_out->cell_type = CELL_CREATED; break;
446  case CELL_CREATE2:
447  cell_out->cell_type = CELL_CREATED2; break;
448  case CELL_CREATE_FAST:
449  cell_out->cell_type = CELL_CREATED_FAST; break;
450  default:
451  tor_assert(0);
452  return WQ_RPL_SHUTDOWN;
453  }
454  rpl.success = 1;
455  }
456  rpl.magic = CPUWORKER_REPLY_MAGIC;
457  if (req.timed) {
458  struct timeval tv_diff;
459  int64_t usec;
460  tor_gettimeofday(&tv_end);
461  timersub(&tv_end, &tv_start, &tv_diff);
462  usec = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec;
463  if (usec < 0 || usec > MAX_BELIEVABLE_ONIONSKIN_DELAY)
465  else
466  rpl.n_usec = (uint32_t) usec;
467  }
468 
469  memcpy(&job->u.reply, &rpl, sizeof(rpl));
470 
471  memwipe(&req, 0, sizeof(req));
472  memwipe(&rpl, 0, sizeof(req));
473  return WQ_RPL_REPLY;
474 }
475 
477 static void
479 {
480  or_circuit_t *circ;
481  create_cell_t *onionskin = NULL;
482 
483  while (total_pending_tasks < max_pending_tasks) {
484  circ = onion_next_task(&onionskin);
485 
486  if (!circ)
487  return;
488 
489  if (assign_onionskin_to_cpuworker(circ, onionskin) < 0)
490  log_info(LD_OR,"assign_to_cpuworker failed. Ignoring.");
491  }
492 }
493 
496 cpuworker_queue_work,(workqueue_priority_t priority,
497  workqueue_reply_t (*fn)(void *, void *),
498  void (*reply_fn)(void *),
499  void *arg))
500 {
501  tor_assert(threadpool);
502 
503  return threadpool_queue_work_priority(threadpool,
504  priority,
505  fn,
506  reply_fn,
507  arg);
508 }
509 
515 int
517  create_cell_t *onionskin)
518 {
519  workqueue_entry_t *queue_entry;
520  cpuworker_job_t *job;
522  int should_time;
523 
524  tor_assert(threadpool);
525 
526  if (!circ->p_chan) {
527  log_info(LD_OR,"circ->p_chan gone. Failing circ.");
528  tor_free(onionskin);
529  return -1;
530  }
531 
532  if (total_pending_tasks >= max_pending_tasks) {
533  log_debug(LD_OR,"No idle cpuworkers. Queuing.");
534  if (onion_pending_add(circ, onionskin) < 0) {
535  tor_free(onionskin);
536  return -1;
537  }
538  return 0;
539  }
540 
541  if (!channel_is_client(circ->p_chan))
543 
544  should_time = should_time_request(onionskin->handshake_type);
545  memset(&req, 0, sizeof(req));
547  req.timed = should_time;
548 
549  memcpy(&req.create_cell, onionskin, sizeof(create_cell_t));
550 
551  tor_free(onionskin);
552 
553  if (should_time)
554  tor_gettimeofday(&req.started_at);
555 
556  job = tor_malloc_zero(sizeof(cpuworker_job_t));
557  job->circ = circ;
558  memcpy(&job->u.request, &req, sizeof(req));
559  memwipe(&req, 0, sizeof(req));
560 
561  ++total_pending_tasks;
562  queue_entry = threadpool_queue_work_priority(threadpool,
563  WQ_PRI_HIGH,
566  job);
567  if (!queue_entry) {
568  log_warn(LD_BUG, "Couldn't queue work on threadpool");
569  tor_free(job);
570  return -1;
571  }
572 
573  log_debug(LD_OR, "Queued task %p (qe=%p, circ=%p)",
574  job, queue_entry, job->circ);
575 
576  circ->workqueue_entry = queue_entry;
577 
578  return 0;
579 }
580 
583 void
585 {
586  cpuworker_job_t *job;
587  if (circ->workqueue_entry == NULL)
588  return;
589 
591  if (job) {
592  /* It successfully cancelled. */
593  memwipe(job, 0xe0, sizeof(*job));
594  tor_free(job);
595  tor_assert(total_pending_tasks > 0);
596  --total_pending_tasks;
597  /* if (!job), this is done in cpuworker_onion_handshake_replyfn. */
598  circ->workqueue_entry = NULL;
599  }
600 }
int channel_is_client(const channel_t *chan)
Definition: channel.c:2913
static uint64_t onionskins_n_processed[MAX_ONION_HANDSHAKE_TYPE+1]
Definition: cpuworker.c:211
uint32_t magic
Definition: cpuworker.c:139
uint8_t onionskin[CELL_PAYLOAD_SIZE - 4]
Definition: onion.h:32
Common functions for using (pseudo-)random number generators.
unsigned int timed
Definition: cpuworker.c:145
static workqueue_reply_t cpuworker_onion_handshake_threadfn(void *state_, void *work_)
Definition: cpuworker.c:404
uint8_t cell_type
Definition: onion.h:38
uint16_t handshake_len
Definition: onion.h:30
uint16_t handshake_type
Definition: cpuworker.c:147
threadpool_t * threadpool_new(int n_threads, replyqueue_t *replyqueue, void *(*new_thread_state_fn)(void *), void(*free_thread_state_fn)(void *), void *arg)
Definition: workqueue.c:544
void crypto_seed_weak_rng(tor_weak_rng_t *rng)
Definition: crypto_rand.c:109
struct timeval started_at
Definition: cpuworker.c:128
workqueue_reply_t
Definition: workqueue.h:24
static int should_time_request(uint16_t onionskin_type)
Definition: cpuworker.c:229
struct cpuworker_request_t cpuworker_request_t
server_onion_keys_t * server_onion_keys_new(void)
Definition: onion_crypto.c:51
workqueue_priority_t
Definition: workqueue.h:31
struct timeval started_at
Definition: cpuworker.c:149
static void cpuworker_onion_handshake_replyfn(void *work_)
Definition: cpuworker.c:315
#define TO_CIRCUIT(x)
Definition: or.h:947
Header file for config.c.
uint8_t rend_auth_material[DIGEST_LEN]
Definition: cpuworker.c:164
Header file for cpuworker.c.
Header file for onion.c.
#define tor_free(p)
Definition: malloc.h:52
struct cpuworker_reply_t cpuworker_reply_t
void cpuworker_cancel_circ_handshake(or_circuit_t *circ)
Definition: cpuworker.c:584
void memwipe(void *mem, uint8_t byte, size_t sz)
Definition: crypto_util.c:57
int threadpool_register_reply_event(threadpool_t *tp, void(*cb)(threadpool_t *tp))
Definition: workqueue.c:631
channel_t * p_chan
Definition: or_circuit_st.h:35
created_cell_t created_cell
Definition: cpuworker.c:160
Common functions for cryptographic routines.
Header file for channel.c.
tor_assert(buffer)
static uint64_t onionskins_usec_internal[MAX_ONION_HANDSHAKE_TYPE+1]
Definition: cpuworker.c:215
struct workqueue_entry_s * workqueue_entry
Definition: or_circuit_st.h:28
void cpu_init(void)
Definition: cpuworker.c:86
int threadpool_queue_update(threadpool_t *pool, void *(*dup_fn)(void *), workqueue_reply_t(*fn)(void *, void *), void(*free_fn)(void *), void *arg)
Definition: workqueue.c:440
#define DIGEST_LEN
Definition: digest_sizes.h:20
Master header file for Tor-specific functionality.
uint8_t reply[CELL_PAYLOAD_SIZE - 2]
Definition: onion.h:42
void rep_hist_note_circuit_handshake_assigned(uint16_t type)
Definition: rephist.c:2452
Header file for onion_queue.c.
Header file for circuitbuild.c.
Header file for rephist.c.
uint8_t cell_type
Definition: onion.h:26
static void queue_pending_tasks(void)
Definition: cpuworker.c:478
int onionskin_answer(or_circuit_t *circ, const created_cell_t *created_cell, const char *keys, size_t keys_len, const uint8_t *rend_circ_nonce)
Header for weakrng.c.
MOCK_IMPL(workqueue_entry_t *, cpuworker_queue_work,(workqueue_priority_t priority, workqueue_reply_t(*fn)(void *, void *), void(*reply_fn)(void *), void *arg))
Definition: cpuworker.c:495
Header file for circuitlist.c.
void cpuworkers_rotate_keyinfo(void)
Definition: cpuworker.c:192
#define LD_OR
Definition: log.h:88
create_cell_t create_cell
Definition: cpuworker.c:131
Definition: workqueue.c:98
uint32_t n_usec
Definition: cpuworker.c:153
#define MAX_BELIEVABLE_ONIONSKIN_DELAY
Definition: cpuworker.c:224
uint8_t keys[CPATH_KEY_MATERIAL_LEN]
Definition: cpuworker.c:162
Header file for router.c.
uint64_t estimated_usec_for_onionskins(uint32_t n_requests, uint16_t onionskin_type)
Definition: cpuworker.c:247
uint16_t handshake_type
Definition: onion.h:28
int onion_pending_add(or_circuit_t *circ, create_cell_t *onionskin)
Definition: onion_queue.c:140
#define log_fn(severity, domain, args,...)
Definition: log.h:255
void cpuworker_log_onionskin_overhead(int severity, int onionskin_type, const char *onionskin_type_name)
Definition: cpuworker.c:295
or_circuit_t * onion_next_task(create_cell_t **onionskin_out)
Definition: onion_queue.c:265
int onion_skin_server_handshake(int type, const uint8_t *onion_skin, size_t onionskin_len, const server_onion_keys_t *keys, uint8_t *reply_out, uint8_t *keys_out, size_t keys_out_len, uint8_t *rend_nonce_out)
Definition: onion_crypto.c:174
uint32_t magic
Definition: circuit_st.h:54
workqueue_entry_t * threadpool_queue_work_priority(threadpool_t *pool, workqueue_priority_t prio, workqueue_reply_t(*fn)(void *, void *), void(*reply_fn)(void *), void *arg)
Definition: workqueue.c:389
static int get_overhead_for_onionskins(uint32_t *usec_out, double *frac_out, uint16_t onionskin_type)
Definition: cpuworker.c:268
Header file for connection_or.c.
#define timersub(tv1, tv2, tvout)
Definition: timeval.h:40
uint16_t handshake_len
Definition: onion.h:40
static uint64_t onionskins_usec_roundtrip[MAX_ONION_HANDSHAKE_TYPE+1]
Definition: cpuworker.c:220
Header for workqueue.c.
#define tor_weak_random_one_in_n(rng, n)
Definition: weakrng.h:29
int get_num_cpus(const or_options_t *options)
Definition: config.c:7959
#define CPUWORKER_REQUEST_MAGIC
Definition: cpuworker.c:117
replyqueue_t * replyqueue_new(uint32_t alertsocks_flags)
Definition: workqueue.c:594
#define LD_BUG
Definition: log.h:82
void * workqueue_entry_cancel(workqueue_entry_t *ent)
Definition: workqueue.c:194
Header file for onion_crypto.c.
int assign_onionskin_to_cpuworker(or_circuit_t *circ, create_cell_t *onionskin)
Definition: cpuworker.c:516