Tor  0.4.7.0-alpha-dev
workqueue.c
Go to the documentation of this file.
1 
2 /* copyright (c) 2013-2015, The Tor Project, Inc. */
3 /* See LICENSE for licensing information */
4 
5 /**
6  * \file workqueue.c
7  *
8  * \brief Implements worker threads, queues of work for them, and mechanisms
9  * for them to send answers back to the main thread.
10  *
11  * The main structure here is a threadpool_t : it manages a set of worker
12  * threads, a queue of pending work, and a reply queue. Every piece of work
13  * is a workqueue_entry_t, containing data to process and a function to
14  * process it with.
15  *
16  * The main thread informs the worker threads of pending work by using a
17  * condition variable. The workers inform the main process of completed work
18  * by using an alert_sockets_t object, as implemented in net/alertsock.c.
19  *
20  * The main thread can also queue an "update" that will be handled by all the
21  * workers. This is useful for updating state that all the workers share.
22  *
23  * In Tor today, there is currently only one thread pool, used in cpuworker.c.
24  */
25 
26 #include "orconfig.h"
28 #include "lib/evloop/workqueue.h"
29 
31 #include "lib/intmath/weakrng.h"
32 #include "lib/log/ratelim.h"
33 #include "lib/log/log.h"
34 #include "lib/log/util_bug.h"
35 #include "lib/net/alertsock.h"
36 #include "lib/net/socket.h"
37 #include "lib/thread/threads.h"
38 
39 #include "ext/tor_queue.h"
40 #include <event2/event.h>
41 #include <string.h>
42 
43 #define WORKQUEUE_PRIORITY_FIRST WQ_PRI_HIGH
44 #define WORKQUEUE_PRIORITY_LAST WQ_PRI_LOW
45 #define WORKQUEUE_N_PRIORITIES (((int) WORKQUEUE_PRIORITY_LAST)+1)
46 
47 TOR_TAILQ_HEAD(work_tailq_t, workqueue_entry_t);
48 typedef struct work_tailq_t work_tailq_t;
49 
50 struct threadpool_t {
51  /** An array of pointers to workerthread_t: one for each running worker
52  * thread. */
54 
55  /** Condition variable that we wait on when we have no work, and which
56  * gets signaled when our queue becomes nonempty. */
58  /** Queues of pending work that we have to do. The queue with priority
59  * <b>p</b> is work[p]. */
60  work_tailq_t work[WORKQUEUE_N_PRIORITIES];
61 
62  /** The current 'update generation' of the threadpool. Any thread that is
63  * at an earlier generation needs to run the update function. */
64  unsigned generation;
65 
66  /** Function that should be run for updates on each thread. */
67  workqueue_reply_t (*update_fn)(void *, void *);
68  /** Function to free update arguments if they can't be run. */
69  void (*free_update_arg_fn)(void *);
70  /** Array of n_threads update arguments. */
71  void **update_args;
72  /** Event to notice when another thread has sent a reply. */
73  struct event *reply_event;
74  void (*reply_cb)(threadpool_t *);
75 
76  /** Number of elements in threads. */
77  int n_threads;
78  /** Mutex to protect all the above fields. */
80 
81  /** A reply queue to use when constructing new threads. */
83 
84  /** Functions used to allocate and free thread state. */
85  void *(*new_thread_state_fn)(void*);
86  void (*free_thread_state_fn)(void*);
87  void *new_thread_state_arg;
88 };
89 
90 /** Used to put a workqueue_priority_t value into a bitfield. */
91 #define workqueue_priority_bitfield_t ENUM_BF(workqueue_priority_t)
92 /** Number of bits needed to hold all legal values of workqueue_priority_t */
93 #define WORKQUEUE_PRIORITY_BITS 2
94 
96  /** The next workqueue_entry_t that's pending on the same thread or
97  * reply queue. */
98  TOR_TAILQ_ENTRY(workqueue_entry_t) next_work;
99  /** The threadpool to which this workqueue_entry_t was assigned. This field
100  * is set when the workqueue_entry_t is created, and won't be cleared until
101  * after it's handled in the main thread. */
102  struct threadpool_t *on_pool;
103  /** True iff this entry is waiting for a worker to start processing it. */
104  uint8_t pending;
105  /** Priority of this entry. */
107  /** Function to run in the worker thread. */
108  workqueue_reply_t (*fn)(void *state, void *arg);
109  /** Function to run while processing the reply queue. */
110  void (*reply_fn)(void *arg);
111  /** Argument for the above functions. */
112  void *arg;
113 };
114 
115 struct replyqueue_t {
116  /** Mutex to protect the answers field */
118  /** Doubly-linked list of answers that the reply queue needs to handle. */
119  TOR_TAILQ_HEAD(, workqueue_entry_t) answers;
120 
121  /** Mechanism to wake up the main thread when it is receiving answers. */
122  alert_sockets_t alert;
123 };
124 
125 /** A worker thread represents a single thread in a thread pool. */
126 typedef struct workerthread_t {
127  /** Which thread it this? In range 0..in_pool->n_threads-1 */
128  int index;
129  /** The pool this thread is a part of. */
131  /** User-supplied state field that we pass to the worker functions of each
132  * work item. */
133  void *state;
134  /** Reply queue to which we pass our results. */
136  /** The current update generation of this thread */
137  unsigned generation;
138  /** One over the probability of taking work from a lower-priority queue. */
141 
142 static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work);
143 
144 /** Allocate and return a new workqueue_entry_t, set up to run the function
145  * <b>fn</b> in the worker thread, and <b>reply_fn</b> in the main
146  * thread. See threadpool_queue_work() for full documentation. */
147 static workqueue_entry_t *
149  void (*reply_fn)(void*),
150  void *arg)
151 {
152  workqueue_entry_t *ent = tor_malloc_zero(sizeof(workqueue_entry_t));
153  ent->fn = fn;
154  ent->reply_fn = reply_fn;
155  ent->arg = arg;
156  ent->priority = WQ_PRI_HIGH;
157  return ent;
158 }
159 
160 #define workqueue_entry_free(ent) \
161  FREE_AND_NULL(workqueue_entry_t, workqueue_entry_free_, (ent))
162 
163 /**
164  * Release all storage held in <b>ent</b>. Call only when <b>ent</b> is not on
165  * any queue.
166  */
167 static void
169 {
170  if (!ent)
171  return;
172  memset(ent, 0xf0, sizeof(*ent));
173  tor_free(ent);
174 }
175 
176 /**
177  * Cancel a workqueue_entry_t that has been returned from
178  * threadpool_queue_work.
179  *
180  * You must not call this function on any work whose reply function has been
181  * executed in the main thread; that will cause undefined behavior (probably,
182  * a crash).
183  *
184  * If the work is cancelled, this function return the argument passed to the
185  * work function. It is the caller's responsibility to free this storage.
186  *
187  * This function will have no effect if the worker thread has already executed
188  * or begun to execute the work item. In that case, it will return NULL.
189  */
190 void *
192 {
193  int cancelled = 0;
194  void *result = NULL;
195  tor_mutex_acquire(&ent->on_pool->lock);
196  workqueue_priority_t prio = ent->priority;
197  if (ent->pending) {
198  TOR_TAILQ_REMOVE(&ent->on_pool->work[prio], ent, next_work);
199  cancelled = 1;
200  result = ent->arg;
201  }
202  tor_mutex_release(&ent->on_pool->lock);
203 
204  if (cancelled) {
205  workqueue_entry_free(ent);
206  }
207  return result;
208 }
209 
210 /**DOCDOC
211 
212  must hold lock */
213 static int
215 {
216  unsigned i;
217  for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
218  if (!TOR_TAILQ_EMPTY(&thread->in_pool->work[i]))
219  return 1;
220  }
221  return thread->generation != thread->in_pool->generation;
222 }
223 
224 /** Extract the next workqueue_entry_t from the the thread's pool, removing
225  * it from the relevant queues and marking it as non-pending.
226  *
227  * The caller must hold the lock. */
228 static workqueue_entry_t *
230 {
231  threadpool_t *pool = thread->in_pool;
232  work_tailq_t *queue = NULL, *this_queue;
233  unsigned i;
234  for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
235  this_queue = &pool->work[i];
236  if (!TOR_TAILQ_EMPTY(this_queue)) {
237  queue = this_queue;
239  thread->lower_priority_chance)) {
240  /* Usually we'll just break now, so that we can get out of the loop
241  * and use the queue where we found work. But with a small
242  * probability, we'll keep looking for lower priority work, so that
243  * we don't ignore our low-priority queues entirely. */
244  break;
245  }
246  }
247  }
248 
249  if (queue == NULL)
250  return NULL;
251 
252  workqueue_entry_t *work = TOR_TAILQ_FIRST(queue);
253  TOR_TAILQ_REMOVE(queue, work, next_work);
254  work->pending = 0;
255  return work;
256 }
257 
258 /**
259  * Main function for the worker thread.
260  */
261 static void
262 worker_thread_main(void *thread_)
263 {
264  workerthread_t *thread = thread_;
265  threadpool_t *pool = thread->in_pool;
267  workqueue_reply_t result;
268 
269  tor_mutex_acquire(&pool->lock);
270  while (1) {
271  /* lock must be held at this point. */
272  while (worker_thread_has_work(thread)) {
273  /* lock must be held at this point. */
274  if (thread->in_pool->generation != thread->generation) {
275  void *arg = thread->in_pool->update_args[thread->index];
276  thread->in_pool->update_args[thread->index] = NULL;
277  workqueue_reply_t (*update_fn)(void*,void*) =
278  thread->in_pool->update_fn;
279  thread->generation = thread->in_pool->generation;
280  tor_mutex_release(&pool->lock);
281 
282  workqueue_reply_t r = update_fn(thread->state, arg);
283 
284  if (r != WQ_RPL_REPLY) {
285  return;
286  }
287 
288  tor_mutex_acquire(&pool->lock);
289  continue;
290  }
292  if (BUG(work == NULL))
293  break;
294  tor_mutex_release(&pool->lock);
295 
296  /* We run the work function without holding the thread lock. This
297  * is the main thread's first opportunity to give us more work. */
298  result = work->fn(thread->state, work->arg);
299 
300  /* Queue the reply for the main thread. */
301  queue_reply(thread->reply_queue, work);
302 
303  /* We may need to exit the thread. */
304  if (result != WQ_RPL_REPLY) {
305  return;
306  }
307  tor_mutex_acquire(&pool->lock);
308  }
309  /* At this point the lock is held, and there is no work in this thread's
310  * queue. */
311 
312  /* TODO: support an idle-function */
313 
314  /* Okay. Now, wait till somebody has work for us. */
315  if (tor_cond_wait(&pool->condition, &pool->lock, NULL) < 0) {
316  log_warn(LD_GENERAL, "Fail tor_cond_wait.");
317  }
318  }
319 }
320 
321 /** Put a reply on the reply queue. The reply must not currently be on
322  * any thread's work queue. */
323 static void
325 {
326  int was_empty;
327  tor_mutex_acquire(&queue->lock);
328  was_empty = TOR_TAILQ_EMPTY(&queue->answers);
329  TOR_TAILQ_INSERT_TAIL(&queue->answers, work, next_work);
330  tor_mutex_release(&queue->lock);
331 
332  if (was_empty) {
333  if (queue->alert.alert_fn(queue->alert.write_fd) < 0) {
334  /* XXXX complain! */
335  }
336  }
337 }
338 
339 /** Allocate and start a new worker thread to use state object <b>state</b>,
340  * and send responses to <b>replyqueue</b>. */
341 static workerthread_t *
342 workerthread_new(int32_t lower_priority_chance,
343  void *state, threadpool_t *pool, replyqueue_t *replyqueue)
344 {
345  workerthread_t *thr = tor_malloc_zero(sizeof(workerthread_t));
346  thr->state = state;
347  thr->reply_queue = replyqueue;
348  thr->in_pool = pool;
349  thr->lower_priority_chance = lower_priority_chance;
350 
351  if (spawn_func(worker_thread_main, thr) < 0) {
352  //LCOV_EXCL_START
354  log_err(LD_GENERAL, "Can't launch worker thread.");
355  tor_free(thr);
356  return NULL;
357  //LCOV_EXCL_STOP
358  }
359 
360  return thr;
361 }
362 
363 /**
364  * Queue an item of work for a thread in a thread pool. The function
365  * <b>fn</b> will be run in a worker thread, and will receive as arguments the
366  * thread's state object, and the provided object <b>arg</b>. It must return
367  * one of WQ_RPL_REPLY, WQ_RPL_ERROR, or WQ_RPL_SHUTDOWN.
368  *
369  * Regardless of its return value, the function <b>reply_fn</b> will later be
370  * run in the main thread when it invokes replyqueue_process(), and will
371  * receive as its argument the same <b>arg</b> object. It's the reply
372  * function's responsibility to free the work object.
373  *
374  * On success, return a workqueue_entry_t object that can be passed to
375  * workqueue_entry_cancel(). On failure, return NULL. (Failure is not
376  * currently possible, but callers should check anyway.)
377  *
378  * Items are executed in a loose priority order -- each thread will usually
379  * take from the queued work with the highest prioirity, but will occasionally
380  * visit lower-priority queues to keep them from starving completely.
381  *
382  * Note that because of priorities and thread behavior, work items may not
383  * be executed strictly in order.
384  */
388  workqueue_reply_t (*fn)(void *, void *),
389  void (*reply_fn)(void *),
390  void *arg)
391 {
392  tor_assert(((int)prio) >= WORKQUEUE_PRIORITY_FIRST &&
393  ((int)prio) <= WORKQUEUE_PRIORITY_LAST);
394 
395  workqueue_entry_t *ent = workqueue_entry_new(fn, reply_fn, arg);
396  ent->on_pool = pool;
397  ent->pending = 1;
398  ent->priority = prio;
399 
400  tor_mutex_acquire(&pool->lock);
401 
402  TOR_TAILQ_INSERT_TAIL(&pool->work[prio], ent, next_work);
403 
405 
406  tor_mutex_release(&pool->lock);
407 
408  return ent;
409 }
410 
411 /** As threadpool_queue_work_priority(), but assumes WQ_PRI_HIGH */
414  workqueue_reply_t (*fn)(void *, void *),
415  void (*reply_fn)(void *),
416  void *arg)
417 {
418  return threadpool_queue_work_priority(pool, WQ_PRI_HIGH, fn, reply_fn, arg);
419 }
420 
421 /**
422  * Queue a copy of a work item for every thread in a pool. This can be used,
423  * for example, to tell the threads to update some parameter in their states.
424  *
425  * Arguments are as for <b>threadpool_queue_work</b>, except that the
426  * <b>arg</b> value is passed to <b>dup_fn</b> once per each thread to
427  * make a copy of it.
428  *
429  * UPDATE FUNCTIONS MUST BE IDEMPOTENT. We do not guarantee that every update
430  * will be run. If a new update is scheduled before the old update finishes
431  * running, then the new will replace the old in any threads that haven't run
432  * it yet.
433  *
434  * Return 0 on success, -1 on failure.
435  */
436 int
438  void *(*dup_fn)(void *),
439  workqueue_reply_t (*fn)(void *, void *),
440  void (*free_fn)(void *),
441  void *arg)
442 {
443  int i, n_threads;
444  void (*old_args_free_fn)(void *arg);
445  void **old_args;
446  void **new_args;
447 
448  tor_mutex_acquire(&pool->lock);
449  n_threads = pool->n_threads;
450  old_args = pool->update_args;
451  old_args_free_fn = pool->free_update_arg_fn;
452 
453  new_args = tor_calloc(n_threads, sizeof(void*));
454  for (i = 0; i < n_threads; ++i) {
455  if (dup_fn)
456  new_args[i] = dup_fn(arg);
457  else
458  new_args[i] = arg;
459  }
460 
461  pool->update_args = new_args;
462  pool->free_update_arg_fn = free_fn;
463  pool->update_fn = fn;
464  ++pool->generation;
465 
467 
468  tor_mutex_release(&pool->lock);
469 
470  if (old_args) {
471  for (i = 0; i < n_threads; ++i) {
472  if (old_args[i] && old_args_free_fn)
473  old_args_free_fn(old_args[i]);
474  }
475  tor_free(old_args);
476  }
477 
478  return 0;
479 }
480 
481 /** Don't have more than this many threads per pool. */
482 #define MAX_THREADS 1024
483 
484 /** For half of our threads, choose lower priority queues with probability
485  * 1/N for each of these values. Both are chosen somewhat arbitrarily. If
486  * CHANCE_PERMISSIVE is too low, then we have a risk of low-priority tasks
487  * stalling forever. If it's too high, we have a risk of low-priority tasks
488  * grabbing half of the threads. */
489 #define CHANCE_PERMISSIVE 37
490 #define CHANCE_STRICT INT32_MAX
491 
492 /** Launch threads until we have <b>n</b>. */
493 static int
495 {
496  if (BUG(n < 0))
497  return -1; // LCOV_EXCL_LINE
498  if (n > MAX_THREADS)
499  n = MAX_THREADS;
500 
501  tor_mutex_acquire(&pool->lock);
502 
503  if (pool->n_threads < n)
504  pool->threads = tor_reallocarray(pool->threads,
505  sizeof(workerthread_t*), n);
506 
507  while (pool->n_threads < n) {
508  /* For half of our threads, we'll choose lower priorities permissively;
509  * for the other half, we'll stick more strictly to higher priorities.
510  * This keeps slow low-priority tasks from taking over completely. */
511  int32_t chance = (pool->n_threads & 1) ? CHANCE_STRICT : CHANCE_PERMISSIVE;
512 
513  void *state = pool->new_thread_state_fn(pool->new_thread_state_arg);
514  workerthread_t *thr = workerthread_new(chance,
515  state, pool, pool->reply_queue);
516 
517  if (!thr) {
518  //LCOV_EXCL_START
520  pool->free_thread_state_fn(state);
521  tor_mutex_release(&pool->lock);
522  return -1;
523  //LCOV_EXCL_STOP
524  }
525  thr->index = pool->n_threads;
526  pool->threads[pool->n_threads++] = thr;
527  }
528  tor_mutex_release(&pool->lock);
529 
530  return 0;
531 }
532 
533 /**
534  * Construct a new thread pool with <b>n</b> worker threads, configured to
535  * send their output to <b>replyqueue</b>. The threads' states will be
536  * constructed with the <b>new_thread_state_fn</b> call, receiving <b>arg</b>
537  * as its argument. When the threads close, they will call
538  * <b>free_thread_state_fn</b> on their states.
539  */
540 threadpool_t *
542  replyqueue_t *replyqueue,
543  void *(*new_thread_state_fn)(void*),
544  void (*free_thread_state_fn)(void*),
545  void *arg)
546 {
547  threadpool_t *pool;
548  pool = tor_malloc_zero(sizeof(threadpool_t));
550  tor_cond_init(&pool->condition);
551  unsigned i;
552  for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
553  TOR_TAILQ_INIT(&pool->work[i]);
554  }
555 
557  pool->new_thread_state_arg = arg;
558  pool->free_thread_state_fn = free_thread_state_fn;
559  pool->reply_queue = replyqueue;
560 
561  if (threadpool_start_threads(pool, n_threads) < 0) {
562  //LCOV_EXCL_START
564  tor_cond_uninit(&pool->condition);
565  tor_mutex_uninit(&pool->lock);
566  tor_free(pool);
567  return NULL;
568  //LCOV_EXCL_STOP
569  }
570 
571  return pool;
572 }
573 
574 /** Return the reply queue associated with a given thread pool. */
575 replyqueue_t *
577 {
578  return tp->reply_queue;
579 }
580 
581 /** Allocate a new reply queue. Reply queues are used to pass results from
582  * worker threads to the main thread. Since the main thread is running an
583  * IO-centric event loop, it needs to get woken up with means other than a
584  * condition variable. */
585 replyqueue_t *
586 replyqueue_new(uint32_t alertsocks_flags)
587 {
588  replyqueue_t *rq;
589 
590  rq = tor_malloc_zero(sizeof(replyqueue_t));
591  if (alert_sockets_create(&rq->alert, alertsocks_flags) < 0) {
592  //LCOV_EXCL_START
593  tor_free(rq);
594  return NULL;
595  //LCOV_EXCL_STOP
596  }
597 
598  tor_mutex_init(&rq->lock);
599  TOR_TAILQ_INIT(&rq->answers);
600 
601  return rq;
602 }
603 
604 /** Internal: Run from the libevent mainloop when there is work to handle in
605  * the reply queue handler. */
606 static void
607 reply_event_cb(evutil_socket_t sock, short events, void *arg)
608 {
609  threadpool_t *tp = arg;
610  (void) sock;
611  (void) events;
613  if (tp->reply_cb)
614  tp->reply_cb(tp);
615 }
616 
617 /** Register the threadpool <b>tp</b>'s reply queue with Tor's global
618  * libevent mainloop. If <b>cb</b> is provided, it is run after
619  * each time there is work to process from the reply queue. Return 0 on
620  * success, -1 on failure.
621  */
622 int
624  void (*cb)(threadpool_t *tp))
625 {
626  struct event_base *base = tor_libevent_get_base();
627 
628  if (tp->reply_event) {
629  tor_event_free(tp->reply_event);
630  }
631  tp->reply_event = tor_event_new(base,
632  tp->reply_queue->alert.read_fd,
633  EV_READ|EV_PERSIST,
635  tp);
636  tor_assert(tp->reply_event);
637  tp->reply_cb = cb;
638  return event_add(tp->reply_event, NULL);
639 }
640 
641 /**
642  * Process all pending replies on a reply queue. The main thread should call
643  * this function every time the socket returned by replyqueue_get_socket() is
644  * readable.
645  */
646 void
648 {
649  int r = queue->alert.drain_fn(queue->alert.read_fd);
650  if (r < 0) {
651  //LCOV_EXCL_START
652  static ratelim_t warn_limit = RATELIM_INIT(7200);
653  log_fn_ratelim(&warn_limit, LOG_WARN, LD_GENERAL,
654  "Failure from drain_fd: %s",
655  tor_socket_strerror(-r));
656  //LCOV_EXCL_STOP
657  }
658 
659  tor_mutex_acquire(&queue->lock);
660  while (!TOR_TAILQ_EMPTY(&queue->answers)) {
661  /* lock must be held at this point.*/
662  workqueue_entry_t *work = TOR_TAILQ_FIRST(&queue->answers);
663  TOR_TAILQ_REMOVE(&queue->answers, work, next_work);
664  tor_mutex_release(&queue->lock);
665  work->on_pool = NULL;
666 
667  work->reply_fn(work->arg);
668  workqueue_entry_free(work);
669 
670  tor_mutex_acquire(&queue->lock);
671  }
672 
673  tor_mutex_release(&queue->lock);
674 }
int alert_sockets_create(alert_sockets_t *socks_out, uint32_t flags)
Definition: alertsock.c:191
Header for alertsock.c.
struct event_base * tor_libevent_get_base(void)
Header for compat_libevent.c.
void tor_mutex_init_nonrecursive(tor_mutex_t *m)
void tor_mutex_release(tor_mutex_t *m)
void tor_mutex_init(tor_mutex_t *m)
void tor_mutex_acquire(tor_mutex_t *m)
void tor_mutex_uninit(tor_mutex_t *m)
void tor_cond_signal_all(tor_cond_t *cond)
int tor_cond_init(tor_cond_t *cond)
int spawn_func(void(*func)(void *), void *data)
void tor_cond_uninit(tor_cond_t *cond)
void tor_cond_signal_one(tor_cond_t *cond)
int tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex, const struct timeval *tv)
Common functions for using (pseudo-)random number generators.
#define crypto_fast_rng_one_in_n(rng, n)
Definition: crypto_rand.h:80
crypto_fast_rng_t * get_thread_fast_rng(void)
Headers for log.c.
#define log_fn_ratelim(ratelim, severity, domain, args,...)
Definition: log.h:288
#define LD_GENERAL
Definition: log.h:62
#define LOG_WARN
Definition: log.h:53
#define tor_free(p)
Definition: malloc.h:52
Summarize similar messages that would otherwise flood the logs.
Header for socket.c.
tor_mutex_t lock
Definition: workqueue.c:117
void *(* new_thread_state_fn)(void *)
Definition: workqueue.c:85
int n_threads
Definition: workqueue.c:77
work_tailq_t work[WORKQUEUE_N_PRIORITIES]
Definition: workqueue.c:60
unsigned generation
Definition: workqueue.c:64
void(* free_update_arg_fn)(void *)
Definition: workqueue.c:69
tor_mutex_t lock
Definition: workqueue.c:79
struct event * reply_event
Definition: workqueue.c:73
tor_cond_t condition
Definition: workqueue.c:57
workqueue_reply_t(* update_fn)(void *, void *)
Definition: workqueue.c:67
struct workerthread_t ** threads
Definition: workqueue.c:53
replyqueue_t * reply_queue
Definition: workqueue.c:82
void ** update_args
Definition: workqueue.c:71
int32_t lower_priority_chance
Definition: workqueue.c:139
void * state
Definition: workqueue.c:133
struct threadpool_t * in_pool
Definition: workqueue.c:130
unsigned generation
Definition: workqueue.c:137
replyqueue_t * reply_queue
Definition: workqueue.c:135
Definition: workqueue.c:95
Header for threads.c.
Macros to manage assertions, fatal and non-fatal.
#define tor_assert_nonfatal_unreached()
Definition: util_bug.h:176
#define tor_assert(expr)
Definition: util_bug.h:102
Header for weakrng.c.
threadpool_t * threadpool_new(int n_threads, replyqueue_t *replyqueue, void *(*new_thread_state_fn)(void *), void(*free_thread_state_fn)(void *), void *arg)
Definition: workqueue.c:541
#define workqueue_priority_bitfield_t
Definition: workqueue.c:91
static workerthread_t * workerthread_new(int32_t lower_priority_chance, void *state, threadpool_t *pool, replyqueue_t *replyqueue)
Definition: workqueue.c:342
void * workqueue_entry_cancel(workqueue_entry_t *ent)
Definition: workqueue.c:191
static int worker_thread_has_work(workerthread_t *thread)
Definition: workqueue.c:214
static workqueue_entry_t * worker_thread_extract_next_work(workerthread_t *thread)
Definition: workqueue.c:229
static int threadpool_start_threads(threadpool_t *pool, int n)
Definition: workqueue.c:494
replyqueue_t * threadpool_get_replyqueue(threadpool_t *tp)
Definition: workqueue.c:576
static void workqueue_entry_free_(workqueue_entry_t *ent)
Definition: workqueue.c:168
void replyqueue_process(replyqueue_t *queue)
Definition: workqueue.c:647
replyqueue_t * replyqueue_new(uint32_t alertsocks_flags)
Definition: workqueue.c:586
static void worker_thread_main(void *thread_)
Definition: workqueue.c:262
static void reply_event_cb(evutil_socket_t sock, short events, void *arg)
Definition: workqueue.c:607
workqueue_entry_t * threadpool_queue_work_priority(threadpool_t *pool, workqueue_priority_t prio, workqueue_reply_t(*fn)(void *, void *), void(*reply_fn)(void *), void *arg)
Definition: workqueue.c:386
#define MAX_THREADS
Definition: workqueue.c:482
int threadpool_register_reply_event(threadpool_t *tp, void(*cb)(threadpool_t *tp))
Definition: workqueue.c:623
int threadpool_queue_update(threadpool_t *pool, void *(*dup_fn)(void *), workqueue_reply_t(*fn)(void *, void *), void(*free_fn)(void *), void *arg)
Definition: workqueue.c:437
static workqueue_entry_t * workqueue_entry_new(workqueue_reply_t(*fn)(void *, void *), void(*reply_fn)(void *), void *arg)
Definition: workqueue.c:148
static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work)
Definition: workqueue.c:324
#define CHANCE_PERMISSIVE
Definition: workqueue.c:489
workqueue_entry_t * threadpool_queue_work(threadpool_t *pool, workqueue_reply_t(*fn)(void *, void *), void(*reply_fn)(void *), void *arg)
Definition: workqueue.c:413
#define WORKQUEUE_PRIORITY_BITS
Definition: workqueue.c:93
Header for workqueue.c.
workqueue_reply_t
Definition: workqueue.h:24
workqueue_priority_t
Definition: workqueue.h:31