tor  0.4.2.1-alpha-dev
workqueue.c
Go to the documentation of this file.
1 
2 /* copyright (c) 2013-2015, The Tor Project, Inc. */
3 /* See LICENSE for licensing information */
4 
26 #include "orconfig.h"
28 #include "lib/evloop/workqueue.h"
29 
31 #include "lib/intmath/weakrng.h"
32 #include "lib/log/ratelim.h"
33 #include "lib/log/log.h"
34 #include "lib/log/util_bug.h"
35 #include "lib/net/alertsock.h"
36 #include "lib/net/socket.h"
37 #include "lib/thread/threads.h"
38 
39 #include "ext/tor_queue.h"
40 #include <event2/event.h>
41 #include <string.h>
42 
43 #define WORKQUEUE_PRIORITY_FIRST WQ_PRI_HIGH
44 #define WORKQUEUE_PRIORITY_LAST WQ_PRI_LOW
45 #define WORKQUEUE_N_PRIORITIES (((int) WORKQUEUE_PRIORITY_LAST)+1)
46 
47 TOR_TAILQ_HEAD(work_tailq_t, workqueue_entry_s);
48 typedef struct work_tailq_t work_tailq_t;
49 
50 struct threadpool_s {
54 
60  work_tailq_t work[WORKQUEUE_N_PRIORITIES];
61 
64  unsigned generation;
65 
67  workqueue_reply_t (*update_fn)(void *, void *);
69  void (*free_update_arg_fn)(void *);
71  void **update_args;
73  struct event *reply_event;
74  void (*reply_cb)(threadpool_t *);
75 
77  int n_threads;
80 
83 
85  void *(*new_thread_state_fn)(void*);
86  void (*free_thread_state_fn)(void*);
87  void *new_thread_state_arg;
88 };
89 
91 #define workqueue_priority_bitfield_t ENUM_BF(workqueue_priority_t)
92 
93 #define WORKQUEUE_PRIORITY_BITS 2
94 
98  TOR_TAILQ_ENTRY(workqueue_entry_s) next_work;
102  struct threadpool_s *on_pool;
104  uint8_t pending;
108  workqueue_reply_t (*fn)(void *state, void *arg);
110  void (*reply_fn)(void *arg);
112  void *arg;
113 };
114 
115 struct replyqueue_s {
120 
122  alert_sockets_t alert;
123 };
124 
126 typedef struct workerthread_s {
128  int index;
133  void *state;
137  unsigned generation;
141 
142 static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work);
143 
147 static workqueue_entry_t *
149  void (*reply_fn)(void*),
150  void *arg)
151 {
152  workqueue_entry_t *ent = tor_malloc_zero(sizeof(workqueue_entry_t));
153  ent->fn = fn;
154  ent->reply_fn = reply_fn;
155  ent->arg = arg;
156  ent->priority = WQ_PRI_HIGH;
157  return ent;
158 }
159 
160 #define workqueue_entry_free(ent) \
161  FREE_AND_NULL(workqueue_entry_t, workqueue_entry_free_, (ent))
162 
167 static void
169 {
170  if (!ent)
171  return;
172  memset(ent, 0xf0, sizeof(*ent));
173  tor_free(ent);
174 }
175 
190 void *
192 {
193  int cancelled = 0;
194  void *result = NULL;
195  tor_mutex_acquire(&ent->on_pool->lock);
196  workqueue_priority_t prio = ent->priority;
197  if (ent->pending) {
198  TOR_TAILQ_REMOVE(&ent->on_pool->work[prio], ent, next_work);
199  cancelled = 1;
200  result = ent->arg;
201  }
202  tor_mutex_release(&ent->on_pool->lock);
203 
204  if (cancelled) {
205  workqueue_entry_free(ent);
206  }
207  return result;
208 }
209 
213 static int
215 {
216  unsigned i;
217  for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
218  if (!TOR_TAILQ_EMPTY(&thread->in_pool->work[i]))
219  return 1;
220  }
221  return thread->generation != thread->in_pool->generation;
222 }
223 
228 static workqueue_entry_t *
230 {
231  threadpool_t *pool = thread->in_pool;
232  work_tailq_t *queue = NULL, *this_queue;
233  unsigned i;
234  for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
235  this_queue = &pool->work[i];
236  if (!TOR_TAILQ_EMPTY(this_queue)) {
237  queue = this_queue;
239  thread->lower_priority_chance)) {
240  /* Usually we'll just break now, so that we can get out of the loop
241  * and use the queue where we found work. But with a small
242  * probability, we'll keep looking for lower priority work, so that
243  * we don't ignore our low-priority queues entirely. */
244  break;
245  }
246  }
247  }
248 
249  if (queue == NULL)
250  return NULL;
251 
252  workqueue_entry_t *work = TOR_TAILQ_FIRST(queue);
253  TOR_TAILQ_REMOVE(queue, work, next_work);
254  work->pending = 0;
255  return work;
256 }
257 
261 static void
262 worker_thread_main(void *thread_)
263 {
264  workerthread_t *thread = thread_;
265  threadpool_t *pool = thread->in_pool;
267  workqueue_reply_t result;
268 
269  tor_mutex_acquire(&pool->lock);
270  while (1) {
271  /* lock must be held at this point. */
272  while (worker_thread_has_work(thread)) {
273  /* lock must be held at this point. */
274  if (thread->in_pool->generation != thread->generation) {
275  void *arg = thread->in_pool->update_args[thread->index];
276  thread->in_pool->update_args[thread->index] = NULL;
277  workqueue_reply_t (*update_fn)(void*,void*) =
278  thread->in_pool->update_fn;
279  thread->generation = thread->in_pool->generation;
280  tor_mutex_release(&pool->lock);
281 
282  workqueue_reply_t r = update_fn(thread->state, arg);
283 
284  if (r != WQ_RPL_REPLY) {
285  return;
286  }
287 
288  tor_mutex_acquire(&pool->lock);
289  continue;
290  }
292  if (BUG(work == NULL))
293  break;
294  tor_mutex_release(&pool->lock);
295 
296  /* We run the work function without holding the thread lock. This
297  * is the main thread's first opportunity to give us more work. */
298  result = work->fn(thread->state, work->arg);
299 
300  /* Queue the reply for the main thread. */
301  queue_reply(thread->reply_queue, work);
302 
303  /* We may need to exit the thread. */
304  if (result != WQ_RPL_REPLY) {
305  return;
306  }
307  tor_mutex_acquire(&pool->lock);
308  }
309  /* At this point the lock is held, and there is no work in this thread's
310  * queue. */
311 
312  /* TODO: support an idle-function */
313 
314  /* Okay. Now, wait till somebody has work for us. */
315  if (tor_cond_wait(&pool->condition, &pool->lock, NULL) < 0) {
316  log_warn(LD_GENERAL, "Fail tor_cond_wait.");
317  }
318  }
319 }
320 
323 static void
325 {
326  int was_empty;
327  tor_mutex_acquire(&queue->lock);
328  was_empty = TOR_TAILQ_EMPTY(&queue->answers);
329  TOR_TAILQ_INSERT_TAIL(&queue->answers, work, next_work);
330  tor_mutex_release(&queue->lock);
331 
332  if (was_empty) {
333  if (queue->alert.alert_fn(queue->alert.write_fd) < 0) {
334  /* XXXX complain! */
335  }
336  }
337 }
338 
341 static workerthread_t *
342 workerthread_new(int32_t lower_priority_chance,
343  void *state, threadpool_t *pool, replyqueue_t *replyqueue)
344 {
345  workerthread_t *thr = tor_malloc_zero(sizeof(workerthread_t));
346  thr->state = state;
347  thr->reply_queue = replyqueue;
348  thr->in_pool = pool;
349  thr->lower_priority_chance = lower_priority_chance;
350 
351  if (spawn_func(worker_thread_main, thr) < 0) {
352  //LCOV_EXCL_START
353  tor_assert_nonfatal_unreached();
354  log_err(LD_GENERAL, "Can't launch worker thread.");
355  tor_free(thr);
356  return NULL;
357  //LCOV_EXCL_STOP
358  }
359 
360  return thr;
361 }
362 
388  workqueue_reply_t (*fn)(void *, void *),
389  void (*reply_fn)(void *),
390  void *arg)
391 {
392  tor_assert(((int)prio) >= WORKQUEUE_PRIORITY_FIRST &&
393  ((int)prio) <= WORKQUEUE_PRIORITY_LAST);
394 
395  workqueue_entry_t *ent = workqueue_entry_new(fn, reply_fn, arg);
396  ent->on_pool = pool;
397  ent->pending = 1;
398  ent->priority = prio;
399 
400  tor_mutex_acquire(&pool->lock);
401 
402  TOR_TAILQ_INSERT_TAIL(&pool->work[prio], ent, next_work);
403 
405 
406  tor_mutex_release(&pool->lock);
407 
408  return ent;
409 }
410 
414  workqueue_reply_t (*fn)(void *, void *),
415  void (*reply_fn)(void *),
416  void *arg)
417 {
418  return threadpool_queue_work_priority(pool, WQ_PRI_HIGH, fn, reply_fn, arg);
419 }
420 
436 int
438  void *(*dup_fn)(void *),
439  workqueue_reply_t (*fn)(void *, void *),
440  void (*free_fn)(void *),
441  void *arg)
442 {
443  int i, n_threads;
444  void (*old_args_free_fn)(void *arg);
445  void **old_args;
446  void **new_args;
447 
448  tor_mutex_acquire(&pool->lock);
449  n_threads = pool->n_threads;
450  old_args = pool->update_args;
451  old_args_free_fn = pool->free_update_arg_fn;
452 
453  new_args = tor_calloc(n_threads, sizeof(void*));
454  for (i = 0; i < n_threads; ++i) {
455  if (dup_fn)
456  new_args[i] = dup_fn(arg);
457  else
458  new_args[i] = arg;
459  }
460 
461  pool->update_args = new_args;
462  pool->free_update_arg_fn = free_fn;
463  pool->update_fn = fn;
464  ++pool->generation;
465 
467 
468  tor_mutex_release(&pool->lock);
469 
470  if (old_args) {
471  for (i = 0; i < n_threads; ++i) {
472  if (old_args[i] && old_args_free_fn)
473  old_args_free_fn(old_args[i]);
474  }
475  tor_free(old_args);
476  }
477 
478  return 0;
479 }
480 
482 #define MAX_THREADS 1024
483 
489 #define CHANCE_PERMISSIVE 37
490 #define CHANCE_STRICT INT32_MAX
491 
493 static int
495 {
496  if (BUG(n < 0))
497  return -1; // LCOV_EXCL_LINE
498  if (n > MAX_THREADS)
499  n = MAX_THREADS;
500 
501  tor_mutex_acquire(&pool->lock);
502 
503  if (pool->n_threads < n)
504  pool->threads = tor_reallocarray(pool->threads,
505  sizeof(workerthread_t*), n);
506 
507  while (pool->n_threads < n) {
508  /* For half of our threads, we'll choose lower priorities permissively;
509  * for the other half, we'll stick more strictly to higher priorities.
510  * This keeps slow low-priority tasks from taking over completely. */
511  int32_t chance = (pool->n_threads & 1) ? CHANCE_STRICT : CHANCE_PERMISSIVE;
512 
513  void *state = pool->new_thread_state_fn(pool->new_thread_state_arg);
514  workerthread_t *thr = workerthread_new(chance,
515  state, pool, pool->reply_queue);
516 
517  if (!thr) {
518  //LCOV_EXCL_START
519  tor_assert_nonfatal_unreached();
520  pool->free_thread_state_fn(state);
521  tor_mutex_release(&pool->lock);
522  return -1;
523  //LCOV_EXCL_STOP
524  }
525  thr->index = pool->n_threads;
526  pool->threads[pool->n_threads++] = thr;
527  }
528  tor_mutex_release(&pool->lock);
529 
530  return 0;
531 }
532 
540 threadpool_t *
542  replyqueue_t *replyqueue,
543  void *(*new_thread_state_fn)(void*),
544  void (*free_thread_state_fn)(void*),
545  void *arg)
546 {
547  threadpool_t *pool;
548  pool = tor_malloc_zero(sizeof(threadpool_t));
550  tor_cond_init(&pool->condition);
551  unsigned i;
552  for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
553  TOR_TAILQ_INIT(&pool->work[i]);
554  }
555 
557  pool->new_thread_state_arg = arg;
558  pool->free_thread_state_fn = free_thread_state_fn;
559  pool->reply_queue = replyqueue;
560 
561  if (threadpool_start_threads(pool, n_threads) < 0) {
562  //LCOV_EXCL_START
563  tor_assert_nonfatal_unreached();
564  tor_cond_uninit(&pool->condition);
565  tor_mutex_uninit(&pool->lock);
566  tor_free(pool);
567  return NULL;
568  //LCOV_EXCL_STOP
569  }
570 
571  return pool;
572 }
573 
575 replyqueue_t *
577 {
578  return tp->reply_queue;
579 }
580 
585 replyqueue_t *
586 replyqueue_new(uint32_t alertsocks_flags)
587 {
588  replyqueue_t *rq;
589 
590  rq = tor_malloc_zero(sizeof(replyqueue_t));
591  if (alert_sockets_create(&rq->alert, alertsocks_flags) < 0) {
592  //LCOV_EXCL_START
593  tor_free(rq);
594  return NULL;
595  //LCOV_EXCL_STOP
596  }
597 
598  tor_mutex_init(&rq->lock);
599  TOR_TAILQ_INIT(&rq->answers);
600 
601  return rq;
602 }
603 
606 static void
607 reply_event_cb(evutil_socket_t sock, short events, void *arg)
608 {
609  threadpool_t *tp = arg;
610  (void) sock;
611  (void) events;
613  if (tp->reply_cb)
614  tp->reply_cb(tp);
615 }
616 
622 int
624  void (*cb)(threadpool_t *tp))
625 {
626  struct event_base *base = tor_libevent_get_base();
627 
628  if (tp->reply_event) {
629  tor_event_free(tp->reply_event);
630  }
631  tp->reply_event = tor_event_new(base,
632  tp->reply_queue->alert.read_fd,
633  EV_READ|EV_PERSIST,
635  tp);
636  tor_assert(tp->reply_event);
637  tp->reply_cb = cb;
638  return event_add(tp->reply_event, NULL);
639 }
640 
646 void
648 {
649  int r = queue->alert.drain_fn(queue->alert.read_fd);
650  if (r < 0) {
651  //LCOV_EXCL_START
652  static ratelim_t warn_limit = RATELIM_INIT(7200);
653  log_fn_ratelim(&warn_limit, LOG_WARN, LD_GENERAL,
654  "Failure from drain_fd: %s",
655  tor_socket_strerror(-r));
656  //LCOV_EXCL_STOP
657  }
658 
659  tor_mutex_acquire(&queue->lock);
660  while (!TOR_TAILQ_EMPTY(&queue->answers)) {
661  /* lock must be held at this point.*/
662  workqueue_entry_t *work = TOR_TAILQ_FIRST(&queue->answers);
663  TOR_TAILQ_REMOVE(&queue->answers, work, next_work);
664  tor_mutex_release(&queue->lock);
665  work->on_pool = NULL;
666 
667  work->reply_fn(work->arg);
668  workqueue_entry_free(work);
669 
670  tor_mutex_acquire(&queue->lock);
671  }
672 
673  tor_mutex_release(&queue->lock);
674 }
unsigned generation
Definition: workqueue.c:137
static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work)
Definition: workqueue.c:324
void replyqueue_process(replyqueue_t *queue)
Definition: workqueue.c:647
workqueue_entry_t * threadpool_queue_work(threadpool_t *pool, workqueue_reply_t(*fn)(void *, void *), void(*reply_fn)(void *), void *arg)
Definition: workqueue.c:413
void tor_mutex_release(tor_mutex_t *m)
Common functions for using (pseudo-)random number generators.
int32_t lower_priority_chance
Definition: workqueue.c:139
static workqueue_entry_t * workqueue_entry_new(workqueue_reply_t(*fn)(void *, void *), void(*reply_fn)(void *), void *arg)
Definition: workqueue.c:148
void tor_cond_signal_all(tor_cond_t *cond)
void tor_cond_uninit(tor_cond_t *cond)
Summarize similar messages that would otherwise flood the logs.
threadpool_t * threadpool_new(int n_threads, replyqueue_t *replyqueue, void *(*new_thread_state_fn)(void *), void(*free_thread_state_fn)(void *), void *arg)
Definition: workqueue.c:541
#define LD_GENERAL
Definition: log.h:60
int tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex, const struct timeval *tv)
tor_mutex_t lock
Definition: workqueue.c:79
workqueue_reply_t
Definition: workqueue.h:24
static void worker_thread_main(void *thread_)
Definition: workqueue.c:262
workqueue_priority_t
Definition: workqueue.h:31
tor_cond_t condition
Definition: workqueue.c:57
void *(* new_thread_state_fn)(void *)
Definition: workqueue.c:85
#define crypto_fast_rng_one_in_n(rng, n)
Definition: crypto_rand.h:80
#define tor_free(p)
Definition: malloc.h:52
void tor_mutex_init_nonrecursive(tor_mutex_t *m)
void tor_mutex_uninit(tor_mutex_t *m)
int threadpool_register_reply_event(threadpool_t *tp, void(*cb)(threadpool_t *tp))
Definition: workqueue.c:623
Header for threads.c.
#define MAX_THREADS
Definition: workqueue.c:482
tor_assert(buffer)
#define WORKQUEUE_PRIORITY_BITS
Definition: workqueue.c:93
workqueue_reply_t(* update_fn)(void *, void *)
Definition: workqueue.c:67
int threadpool_queue_update(threadpool_t *pool, void *(*dup_fn)(void *), workqueue_reply_t(*fn)(void *, void *), void(*free_fn)(void *), void *arg)
Definition: workqueue.c:437
replyqueue_t * reply_queue
Definition: workqueue.c:82
static void workqueue_entry_free_(workqueue_entry_t *ent)
Definition: workqueue.c:168
void tor_mutex_init(tor_mutex_t *m)
#define CHANCE_PERMISSIVE
Definition: workqueue.c:489
static workerthread_t * workerthread_new(int32_t lower_priority_chance, void *state, threadpool_t *pool, replyqueue_t *replyqueue)
Definition: workqueue.c:342
replyqueue_t * reply_queue
Definition: workqueue.c:135
#define LOG_WARN
Definition: log.h:51
static workqueue_entry_t * worker_thread_extract_next_work(workerthread_t *thread)
Definition: workqueue.c:229
static void reply_event_cb(evutil_socket_t sock, short events, void *arg)
Definition: workqueue.c:607
void ** update_args
Definition: workqueue.c:71
#define log_fn_ratelim(ratelim, severity, domain, args,...)
Definition: log.h:279
Header for weakrng.c.
struct workerthread_s ** threads
Definition: workqueue.c:53
tor_mutex_t lock
Definition: workqueue.c:117
int n_threads
Definition: workqueue.c:77
struct workerthread_s workerthread_t
int alert_sockets_create(alert_sockets_t *socks_out, uint32_t flags)
Definition: alertsock.c:191
crypto_fast_rng_t * get_thread_fast_rng(void)
Definition: workqueue.c:95
int tor_cond_init(tor_cond_t *cond)
work_tailq_t work[WORKQUEUE_N_PRIORITIES]
Definition: workqueue.c:60
static int threadpool_start_threads(threadpool_t *pool, int n)
Definition: workqueue.c:494
struct event * reply_event
Definition: workqueue.c:73
static TOR_TAILQ_HEAD(onion_queue_head_t, onion_queue_t)
Definition: onion_queue.c:54
void tor_mutex_acquire(tor_mutex_t *m)
#define workqueue_priority_bitfield_t
Definition: workqueue.c:91
workqueue_entry_t * threadpool_queue_work_priority(threadpool_t *pool, workqueue_priority_t prio, workqueue_reply_t(*fn)(void *, void *), void(*reply_fn)(void *), void *arg)
Definition: workqueue.c:386
void * state
Definition: workqueue.c:133
unsigned generation
Definition: workqueue.c:64
replyqueue_t * threadpool_get_replyqueue(threadpool_t *tp)
Definition: workqueue.c:576
Headers for log.c.
Header for socket.c.
Header for compat_libevent.c.
void tor_cond_signal_one(tor_cond_t *cond)
Header for workqueue.c.
Header for alertsock.c.
Macros to manage assertions, fatal and non-fatal.
replyqueue_t * replyqueue_new(uint32_t alertsocks_flags)
Definition: workqueue.c:586
static int worker_thread_has_work(workerthread_t *thread)
Definition: workqueue.c:214
void * workqueue_entry_cancel(workqueue_entry_t *ent)
Definition: workqueue.c:191
struct threadpool_s * in_pool
Definition: workqueue.c:130
int spawn_func(void(*func)(void *), void *data)
void(* free_update_arg_fn)(void *)
Definition: workqueue.c:69