LCOV - code coverage report
Current view: top level - lib/evloop - workqueue.c (source / functions) Hit Total Coverage
Test: lcov.info Lines: 183 194 94.3 %
Date: 2021-11-24 03:28:48 Functions: 17 18 94.4 %

          Line data    Source code
       1             : 
       2             : /* copyright (c) 2013-2015, The Tor Project, Inc. */
       3             : /* See LICENSE for licensing information */
       4             : 
       5             : /**
       6             :  * \file workqueue.c
       7             :  *
       8             :  * \brief Implements worker threads, queues of work for them, and mechanisms
       9             :  * for them to send answers back to the main thread.
      10             :  *
      11             :  * The main structure here is a threadpool_t : it manages a set of worker
      12             :  * threads, a queue of pending work, and a reply queue.  Every piece of work
      13             :  * is a workqueue_entry_t, containing data to process and a function to
      14             :  * process it with.
      15             :  *
      16             :  * The main thread informs the worker threads of pending work by using a
      17             :  * condition variable.  The workers inform the main process of completed work
      18             :  * by using an alert_sockets_t object, as implemented in net/alertsock.c.
      19             :  *
      20             :  * The main thread can also queue an "update" that will be handled by all the
      21             :  * workers.  This is useful for updating state that all the workers share.
      22             :  *
      23             :  * In Tor today, there is currently only one thread pool, used in cpuworker.c.
      24             :  */
      25             : 
      26             : #include "orconfig.h"
      27             : #include "lib/evloop/compat_libevent.h"
      28             : #include "lib/evloop/workqueue.h"
      29             : 
      30             : #include "lib/crypt_ops/crypto_rand.h"
      31             : #include "lib/intmath/weakrng.h"
      32             : #include "lib/log/ratelim.h"
      33             : #include "lib/log/log.h"
      34             : #include "lib/log/util_bug.h"
      35             : #include "lib/net/alertsock.h"
      36             : #include "lib/net/socket.h"
      37             : #include "lib/thread/threads.h"
      38             : 
      39             : #include "ext/tor_queue.h"
      40             : #include <event2/event.h>
      41             : #include <string.h>
      42             : 
      43             : #define WORKQUEUE_PRIORITY_FIRST WQ_PRI_HIGH
      44             : #define WORKQUEUE_PRIORITY_LAST WQ_PRI_LOW
      45             : #define WORKQUEUE_N_PRIORITIES (((int) WORKQUEUE_PRIORITY_LAST)+1)
      46             : 
      47             : TOR_TAILQ_HEAD(work_tailq_t, workqueue_entry_t);
      48             : typedef struct work_tailq_t work_tailq_t;
      49             : 
      50             : struct threadpool_t {
      51             :   /** An array of pointers to workerthread_t: one for each running worker
      52             :    * thread. */
      53             :   struct workerthread_t **threads;
      54             : 
      55             :   /** Condition variable that we wait on when we have no work, and which
      56             :    * gets signaled when our queue becomes nonempty. */
      57             :   tor_cond_t condition;
      58             :   /** Queues of pending work that we have to do. The queue with priority
      59             :    * <b>p</b> is work[p]. */
      60             :   work_tailq_t work[WORKQUEUE_N_PRIORITIES];
      61             : 
      62             :   /** The current 'update generation' of the threadpool.  Any thread that is
      63             :    * at an earlier generation needs to run the update function. */
      64             :   unsigned generation;
      65             : 
      66             :   /** Function that should be run for updates on each thread. */
      67             :   workqueue_reply_t (*update_fn)(void *, void *);
      68             :   /** Function to free update arguments if they can't be run. */
      69             :   void (*free_update_arg_fn)(void *);
      70             :   /** Array of n_threads update arguments. */
      71             :   void **update_args;
      72             :   /** Event to notice when another thread has sent a reply. */
      73             :   struct event *reply_event;
      74             :   void (*reply_cb)(threadpool_t *);
      75             : 
      76             :   /** Number of elements in threads. */
      77             :   int n_threads;
      78             :   /** Mutex to protect all the above fields. */
      79             :   tor_mutex_t lock;
      80             : 
      81             :   /** A reply queue to use when constructing new threads. */
      82             :   replyqueue_t *reply_queue;
      83             : 
      84             :   /** Functions used to allocate and free thread state. */
      85             :   void *(*new_thread_state_fn)(void*);
      86             :   void (*free_thread_state_fn)(void*);
      87             :   void *new_thread_state_arg;
      88             : };
      89             : 
      90             : /** Used to put a workqueue_priority_t value into a bitfield. */
      91             : #define workqueue_priority_bitfield_t ENUM_BF(workqueue_priority_t)
      92             : /** Number of bits needed to hold all legal values of workqueue_priority_t */
      93             : #define WORKQUEUE_PRIORITY_BITS 2
      94             : 
      95             : struct workqueue_entry_t {
      96             :   /** The next workqueue_entry_t that's pending on the same thread or
      97             :    * reply queue. */
      98             :   TOR_TAILQ_ENTRY(workqueue_entry_t) next_work;
      99             :   /** The threadpool to which this workqueue_entry_t was assigned. This field
     100             :    * is set when the workqueue_entry_t is created, and won't be cleared until
     101             :    * after it's handled in the main thread. */
     102             :   struct threadpool_t *on_pool;
     103             :   /** True iff this entry is waiting for a worker to start processing it. */
     104             :   uint8_t pending;
     105             :   /** Priority of this entry. */
     106             :   workqueue_priority_bitfield_t priority : WORKQUEUE_PRIORITY_BITS;
     107             :   /** Function to run in the worker thread. */
     108             :   workqueue_reply_t (*fn)(void *state, void *arg);
     109             :   /** Function to run while processing the reply queue. */
     110             :   void (*reply_fn)(void *arg);
     111             :   /** Argument for the above functions. */
     112             :   void *arg;
     113             : };
     114             : 
     115             : struct replyqueue_t {
     116             :   /** Mutex to protect the answers field */
     117             :   tor_mutex_t lock;
     118             :   /** Doubly-linked list of answers that the reply queue needs to handle. */
     119             :   TOR_TAILQ_HEAD(, workqueue_entry_t) answers;
     120             : 
     121             :   /** Mechanism to wake up the main thread when it is receiving answers. */
     122             :   alert_sockets_t alert;
     123             : };
     124             : 
     125             : /** A worker thread represents a single thread in a thread pool. */
     126             : typedef struct workerthread_t {
     127             :   /** Which thread it this?  In range 0..in_pool->n_threads-1 */
     128             :   int index;
     129             :   /** The pool this thread is a part of. */
     130             :   struct threadpool_t *in_pool;
     131             :   /** User-supplied state field that we pass to the worker functions of each
     132             :    * work item. */
     133             :   void *state;
     134             :   /** Reply queue to which we pass our results. */
     135             :   replyqueue_t *reply_queue;
     136             :   /** The current update generation of this thread */
     137             :   unsigned generation;
     138             :   /** One over the probability of taking work from a lower-priority queue. */
     139             :   int32_t lower_priority_chance;
     140             : } workerthread_t;
     141             : 
     142             : static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work);
     143             : 
     144             : /** Allocate and return a new workqueue_entry_t, set up to run the function
     145             :  * <b>fn</b> in the worker thread, and <b>reply_fn</b> in the main
     146             :  * thread. See threadpool_queue_work() for full documentation. */
     147             : static workqueue_entry_t *
     148       70007 : workqueue_entry_new(workqueue_reply_t (*fn)(void*, void*),
     149             :                     void (*reply_fn)(void*),
     150             :                     void *arg)
     151             : {
     152       70007 :   workqueue_entry_t *ent = tor_malloc_zero(sizeof(workqueue_entry_t));
     153       70007 :   ent->fn = fn;
     154       70007 :   ent->reply_fn = reply_fn;
     155       70007 :   ent->arg = arg;
     156       70007 :   ent->priority = WQ_PRI_HIGH;
     157       70007 :   return ent;
     158             : }
     159             : 
     160             : #define workqueue_entry_free(ent) \
     161             :   FREE_AND_NULL(workqueue_entry_t, workqueue_entry_free_, (ent))
     162             : 
     163             : /**
     164             :  * Release all storage held in <b>ent</b>. Call only when <b>ent</b> is not on
     165             :  * any queue.
     166             :  */
     167             : static void
     168       70000 : workqueue_entry_free_(workqueue_entry_t *ent)
     169             : {
     170       70000 :   if (!ent)
     171             :     return;
     172       70000 :   memset(ent, 0xf0, sizeof(*ent));
     173       70000 :   tor_free(ent);
     174             : }
     175             : 
     176             : /**
     177             :  * Cancel a workqueue_entry_t that has been returned from
     178             :  * threadpool_queue_work.
     179             :  *
     180             :  * You must not call this function on any work whose reply function has been
     181             :  * executed in the main thread; that will cause undefined behavior (probably,
     182             :  * a crash).
     183             :  *
     184             :  * If the work is cancelled, this function return the argument passed to the
     185             :  * work function. It is the caller's responsibility to free this storage.
     186             :  *
     187             :  * This function will have no effect if the worker thread has already executed
     188             :  * or begun to execute the work item.  In that case, it will return NULL.
     189             :  */
     190             : void *
     191          10 : workqueue_entry_cancel(workqueue_entry_t *ent)
     192             : {
     193          10 :   int cancelled = 0;
     194          10 :   void *result = NULL;
     195          10 :   tor_mutex_acquire(&ent->on_pool->lock);
     196          10 :   workqueue_priority_t prio = ent->priority;
     197          10 :   if (ent->pending) {
     198           9 :     TOR_TAILQ_REMOVE(&ent->on_pool->work[prio], ent, next_work);
     199           9 :     cancelled = 1;
     200           9 :     result = ent->arg;
     201             :   }
     202          10 :   tor_mutex_release(&ent->on_pool->lock);
     203             : 
     204          10 :   if (cancelled) {
     205           9 :     workqueue_entry_free(ent);
     206             :   }
     207          10 :   return result;
     208             : }
     209             : 
     210             : /**DOCDOC
     211             : 
     212             :    must hold lock */
     213             : static int
     214       70276 : worker_thread_has_work(workerthread_t *thread)
     215             : {
     216       70276 :   unsigned i;
     217       84371 :   for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
     218       84130 :     if (!TOR_TAILQ_EMPTY(&thread->in_pool->work[i]))
     219             :         return 1;
     220             :   }
     221         241 :   return thread->generation != thread->in_pool->generation;
     222             : }
     223             : 
     224             : /** Extract the next workqueue_entry_t from the the thread's pool, removing
     225             :  * it from the relevant queues and marking it as non-pending.
     226             :  *
     227             :  * The caller must hold the lock. */
     228             : static workqueue_entry_t *
     229       69991 : worker_thread_extract_next_work(workerthread_t *thread)
     230             : {
     231       69991 :   threadpool_t *pool = thread->in_pool;
     232       69991 :   work_tailq_t *queue = NULL, *this_queue;
     233       69991 :   unsigned i;
     234       84502 :   for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
     235       84311 :     this_queue = &pool->work[i];
     236       84311 :     if (!TOR_TAILQ_EMPTY(this_queue)) {
     237       70746 :       queue = this_queue;
     238       70746 :       if (! crypto_fast_rng_one_in_n(get_thread_fast_rng(),
     239             :                                      thread->lower_priority_chance)) {
     240             :         /* Usually we'll just break now, so that we can get out of the loop
     241             :          * and use the queue where we found work. But with a small
     242             :          * probability, we'll keep looking for lower priority work, so that
     243             :          * we don't ignore our low-priority queues entirely. */
     244             :         break;
     245             :       }
     246             :     }
     247             :   }
     248             : 
     249       69991 :   if (queue == NULL)
     250             :     return NULL;
     251             : 
     252       69991 :   workqueue_entry_t *work = TOR_TAILQ_FIRST(queue);
     253       69991 :   TOR_TAILQ_REMOVE(queue, work, next_work);
     254       69991 :   work->pending = 0;
     255       69991 :   return work;
     256             : }
     257             : 
     258             : /**
     259             :  * Main function for the worker thread.
     260             :  */
     261             : static void
     262          56 : worker_thread_main(void *thread_)
     263             : {
     264          56 :   workerthread_t *thread = thread_;
     265          56 :   threadpool_t *pool = thread->in_pool;
     266          56 :   workqueue_entry_t *work;
     267          56 :   workqueue_reply_t result;
     268             : 
     269          56 :   tor_mutex_acquire(&pool->lock);
     270             :   while (1) {
     271             :     /* lock must be held at this point. */
     272       70276 :     while (worker_thread_has_work(thread)) {
     273             :       /* lock must be held at this point. */
     274       70047 :       if (thread->in_pool->generation != thread->generation) {
     275          56 :         void *arg = thread->in_pool->update_args[thread->index];
     276          56 :         thread->in_pool->update_args[thread->index] = NULL;
     277          56 :         workqueue_reply_t (*update_fn)(void*,void*) =
     278          56 :             thread->in_pool->update_fn;
     279          56 :         thread->generation = thread->in_pool->generation;
     280          56 :         tor_mutex_release(&pool->lock);
     281             : 
     282          55 :         workqueue_reply_t r = update_fn(thread->state, arg);
     283             : 
     284          52 :         if (r != WQ_RPL_REPLY) {
     285             :           return;
     286             :         }
     287             : 
     288           0 :         tor_mutex_acquire(&pool->lock);
     289           0 :         continue;
     290             :       }
     291       69991 :       work = worker_thread_extract_next_work(thread);
     292       69991 :       if (BUG(work == NULL))
     293             :         break;
     294       69991 :       tor_mutex_release(&pool->lock);
     295             : 
     296             :       /* We run the work function without holding the thread lock. This
     297             :        * is the main thread's first opportunity to give us more work. */
     298       69983 :       result = work->fn(thread->state, work->arg);
     299             : 
     300             :       /* Queue the reply for the main thread. */
     301       68893 :       queue_reply(thread->reply_queue, work);
     302             : 
     303             :       /* We may need to exit the thread. */
     304       69927 :       if (result != WQ_RPL_REPLY) {
     305             :         return;
     306             :       }
     307       69927 :       tor_mutex_acquire(&pool->lock);
     308             :     }
     309             :     /* At this point the lock is held, and there is no work in this thread's
     310             :      * queue. */
     311             : 
     312             :     /* TODO: support an idle-function */
     313             : 
     314             :     /* Okay. Now, wait till somebody has work for us. */
     315         229 :     if (tor_cond_wait(&pool->condition, &pool->lock, NULL) < 0) {
     316           0 :       log_warn(LD_GENERAL, "Fail tor_cond_wait.");
     317             :     }
     318             :   }
     319             : }
     320             : 
     321             : /** Put a reply on the reply queue.  The reply must not currently be on
     322             :  * any thread's work queue. */
     323             : static void
     324       68756 : queue_reply(replyqueue_t *queue, workqueue_entry_t *work)
     325             : {
     326       68756 :   int was_empty;
     327       68756 :   tor_mutex_acquire(&queue->lock);
     328       69991 :   was_empty = TOR_TAILQ_EMPTY(&queue->answers);
     329       69991 :   TOR_TAILQ_INSERT_TAIL(&queue->answers, work, next_work);
     330       69991 :   tor_mutex_release(&queue->lock);
     331             : 
     332       69986 :   if (was_empty) {
     333       19050 :     if (queue->alert.alert_fn(queue->alert.write_fd) < 0) {
     334             :       /* XXXX complain! */
     335       69986 :     }
     336             :   }
     337       69986 : }
     338             : 
     339             : /** Allocate and start a new worker thread to use state object <b>state</b>,
     340             :  * and send responses to <b>replyqueue</b>. */
     341             : static workerthread_t *
     342          56 : workerthread_new(int32_t lower_priority_chance,
     343             :                  void *state, threadpool_t *pool, replyqueue_t *replyqueue)
     344             : {
     345          56 :   workerthread_t *thr = tor_malloc_zero(sizeof(workerthread_t));
     346          56 :   thr->state = state;
     347          56 :   thr->reply_queue = replyqueue;
     348          56 :   thr->in_pool = pool;
     349          56 :   thr->lower_priority_chance = lower_priority_chance;
     350             : 
     351          56 :   if (spawn_func(worker_thread_main, thr) < 0) {
     352             :     //LCOV_EXCL_START
     353             :     tor_assert_nonfatal_unreached();
     354             :     log_err(LD_GENERAL, "Can't launch worker thread.");
     355             :     tor_free(thr);
     356             :     return NULL;
     357             :     //LCOV_EXCL_STOP
     358             :   }
     359             : 
     360             :   return thr;
     361             : }
     362             : 
     363             : /**
     364             :  * Queue an item of work for a thread in a thread pool.  The function
     365             :  * <b>fn</b> will be run in a worker thread, and will receive as arguments the
     366             :  * thread's state object, and the provided object <b>arg</b>. It must return
     367             :  * one of WQ_RPL_REPLY, WQ_RPL_ERROR, or WQ_RPL_SHUTDOWN.
     368             :  *
     369             :  * Regardless of its return value, the function <b>reply_fn</b> will later be
     370             :  * run in the main thread when it invokes replyqueue_process(), and will
     371             :  * receive as its argument the same <b>arg</b> object.  It's the reply
     372             :  * function's responsibility to free the work object.
     373             :  *
     374             :  * On success, return a workqueue_entry_t object that can be passed to
     375             :  * workqueue_entry_cancel(). On failure, return NULL.  (Failure is not
     376             :  * currently possible, but callers should check anyway.)
     377             :  *
     378             :  * Items are executed in a loose priority order -- each thread will usually
     379             :  * take from the queued work with the highest prioirity, but will occasionally
     380             :  * visit lower-priority queues to keep them from starving completely.
     381             :  *
     382             :  * Note that because of priorities and thread behavior, work items may not
     383             :  * be executed strictly in order.
     384             :  */
     385             : workqueue_entry_t *
     386       70007 : threadpool_queue_work_priority(threadpool_t *pool,
     387             :                                workqueue_priority_t prio,
     388             :                                workqueue_reply_t (*fn)(void *, void *),
     389             :                                void (*reply_fn)(void *),
     390             :                                void *arg)
     391             : {
     392       70007 :   tor_assert(((int)prio) >= WORKQUEUE_PRIORITY_FIRST &&
     393             :              ((int)prio) <= WORKQUEUE_PRIORITY_LAST);
     394             : 
     395       70007 :   workqueue_entry_t *ent = workqueue_entry_new(fn, reply_fn, arg);
     396       70007 :   ent->on_pool = pool;
     397       70007 :   ent->pending = 1;
     398       70007 :   ent->priority = prio;
     399             : 
     400       70007 :   tor_mutex_acquire(&pool->lock);
     401             : 
     402       70007 :   TOR_TAILQ_INSERT_TAIL(&pool->work[prio], ent, next_work);
     403             : 
     404       70007 :   tor_cond_signal_one(&pool->condition);
     405             : 
     406       70007 :   tor_mutex_release(&pool->lock);
     407             : 
     408       70007 :   return ent;
     409             : }
     410             : 
     411             : /** As threadpool_queue_work_priority(), but assumes WQ_PRI_HIGH */
     412             : workqueue_entry_t *
     413       55879 : threadpool_queue_work(threadpool_t *pool,
     414             :                       workqueue_reply_t (*fn)(void *, void *),
     415             :                       void (*reply_fn)(void *),
     416             :                       void *arg)
     417             : {
     418       55879 :   return threadpool_queue_work_priority(pool, WQ_PRI_HIGH, fn, reply_fn, arg);
     419             : }
     420             : 
     421             : /**
     422             :  * Queue a copy of a work item for every thread in a pool.  This can be used,
     423             :  * for example, to tell the threads to update some parameter in their states.
     424             :  *
     425             :  * Arguments are as for <b>threadpool_queue_work</b>, except that the
     426             :  * <b>arg</b> value is passed to <b>dup_fn</b> once per each thread to
     427             :  * make a copy of it.
     428             :  *
     429             :  * UPDATE FUNCTIONS MUST BE IDEMPOTENT.  We do not guarantee that every update
     430             :  * will be run.  If a new update is scheduled before the old update finishes
     431             :  * running, then the new will replace the old in any threads that haven't run
     432             :  * it yet.
     433             :  *
     434             :  * Return 0 on success, -1 on failure.
     435             :  */
     436             : int
     437           7 : threadpool_queue_update(threadpool_t *pool,
     438             :                          void *(*dup_fn)(void *),
     439             :                          workqueue_reply_t (*fn)(void *, void *),
     440             :                          void (*free_fn)(void *),
     441             :                          void *arg)
     442             : {
     443           7 :   int i, n_threads;
     444           7 :   void (*old_args_free_fn)(void *arg);
     445           7 :   void **old_args;
     446           7 :   void **new_args;
     447             : 
     448           7 :   tor_mutex_acquire(&pool->lock);
     449           7 :   n_threads = pool->n_threads;
     450           7 :   old_args = pool->update_args;
     451           7 :   old_args_free_fn = pool->free_update_arg_fn;
     452             : 
     453           7 :   new_args = tor_calloc(n_threads, sizeof(void*));
     454          70 :   for (i = 0; i < n_threads; ++i) {
     455          56 :     if (dup_fn)
     456           0 :       new_args[i] = dup_fn(arg);
     457             :     else
     458          56 :       new_args[i] = arg;
     459             :   }
     460             : 
     461           7 :   pool->update_args = new_args;
     462           7 :   pool->free_update_arg_fn = free_fn;
     463           7 :   pool->update_fn = fn;
     464           7 :   ++pool->generation;
     465             : 
     466           7 :   tor_cond_signal_all(&pool->condition);
     467             : 
     468           7 :   tor_mutex_release(&pool->lock);
     469             : 
     470           7 :   if (old_args) {
     471           0 :     for (i = 0; i < n_threads; ++i) {
     472           0 :       if (old_args[i] && old_args_free_fn)
     473           0 :         old_args_free_fn(old_args[i]);
     474             :     }
     475           0 :     tor_free(old_args);
     476             :   }
     477             : 
     478           7 :   return 0;
     479             : }
     480             : 
     481             : /** Don't have more than this many threads per pool. */
     482             : #define MAX_THREADS 1024
     483             : 
     484             : /** For half of our threads, choose lower priority queues with probability
     485             :  * 1/N for each of these values. Both are chosen somewhat arbitrarily.  If
     486             :  * CHANCE_PERMISSIVE is too low, then we have a risk of low-priority tasks
     487             :  * stalling forever.  If it's too high, we have a risk of low-priority tasks
     488             :  * grabbing half of the threads. */
     489             : #define CHANCE_PERMISSIVE 37
     490             : #define CHANCE_STRICT INT32_MAX
     491             : 
     492             : /** Launch threads until we have <b>n</b>. */
     493             : static int
     494           7 : threadpool_start_threads(threadpool_t *pool, int n)
     495             : {
     496           7 :   if (BUG(n < 0))
     497             :     return -1; // LCOV_EXCL_LINE
     498           7 :   if (n > MAX_THREADS)
     499             :     n = MAX_THREADS;
     500             : 
     501           7 :   tor_mutex_acquire(&pool->lock);
     502             : 
     503           7 :   if (pool->n_threads < n)
     504           7 :     pool->threads = tor_reallocarray(pool->threads,
     505             :                                      sizeof(workerthread_t*), n);
     506             : 
     507          63 :   while (pool->n_threads < n) {
     508             :     /* For half of our threads, we'll choose lower priorities permissively;
     509             :      * for the other half, we'll stick more strictly to higher priorities.
     510             :      * This keeps slow low-priority tasks from taking over completely. */
     511          56 :     int32_t chance = (pool->n_threads & 1) ? CHANCE_STRICT : CHANCE_PERMISSIVE;
     512             : 
     513          56 :     void *state = pool->new_thread_state_fn(pool->new_thread_state_arg);
     514          56 :     workerthread_t *thr = workerthread_new(chance,
     515             :                                            state, pool, pool->reply_queue);
     516             : 
     517          56 :     if (!thr) {
     518             :       //LCOV_EXCL_START
     519             :       tor_assert_nonfatal_unreached();
     520             :       pool->free_thread_state_fn(state);
     521             :       tor_mutex_release(&pool->lock);
     522             :       return -1;
     523             :       //LCOV_EXCL_STOP
     524             :     }
     525          56 :     thr->index = pool->n_threads;
     526          56 :     pool->threads[pool->n_threads++] = thr;
     527             :   }
     528           7 :   tor_mutex_release(&pool->lock);
     529             : 
     530           7 :   return 0;
     531             : }
     532             : 
     533             : /**
     534             :  * Construct a new thread pool with <b>n</b> worker threads, configured to
     535             :  * send their output to <b>replyqueue</b>.  The threads' states will be
     536             :  * constructed with the <b>new_thread_state_fn</b> call, receiving <b>arg</b>
     537             :  * as its argument.  When the threads close, they will call
     538             :  * <b>free_thread_state_fn</b> on their states.
     539             :  */
     540             : threadpool_t *
     541           7 : threadpool_new(int n_threads,
     542             :                replyqueue_t *replyqueue,
     543             :                void *(*new_thread_state_fn)(void*),
     544             :                void (*free_thread_state_fn)(void*),
     545             :                void *arg)
     546             : {
     547           7 :   threadpool_t *pool;
     548           7 :   pool = tor_malloc_zero(sizeof(threadpool_t));
     549           7 :   tor_mutex_init_nonrecursive(&pool->lock);
     550           7 :   tor_cond_init(&pool->condition);
     551           7 :   unsigned i;
     552          35 :   for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
     553          21 :     TOR_TAILQ_INIT(&pool->work[i]);
     554             :   }
     555             : 
     556           7 :   pool->new_thread_state_fn = new_thread_state_fn;
     557           7 :   pool->new_thread_state_arg = arg;
     558           7 :   pool->free_thread_state_fn = free_thread_state_fn;
     559           7 :   pool->reply_queue = replyqueue;
     560             : 
     561           7 :   if (threadpool_start_threads(pool, n_threads) < 0) {
     562             :     //LCOV_EXCL_START
     563             :     tor_assert_nonfatal_unreached();
     564             :     tor_cond_uninit(&pool->condition);
     565             :     tor_mutex_uninit(&pool->lock);
     566             :     tor_free(pool);
     567             :     return NULL;
     568             :     //LCOV_EXCL_STOP
     569             :   }
     570             : 
     571             :   return pool;
     572             : }
     573             : 
     574             : /** Return the reply queue associated with a given thread pool. */
     575             : replyqueue_t *
     576           0 : threadpool_get_replyqueue(threadpool_t *tp)
     577             : {
     578           0 :   return tp->reply_queue;
     579             : }
     580             : 
     581             : /** Allocate a new reply queue.  Reply queues are used to pass results from
     582             :  * worker threads to the main thread.  Since the main thread is running an
     583             :  * IO-centric event loop, it needs to get woken up with means other than a
     584             :  * condition variable. */
     585             : replyqueue_t *
     586           7 : replyqueue_new(uint32_t alertsocks_flags)
     587             : {
     588           7 :   replyqueue_t *rq;
     589             : 
     590           7 :   rq = tor_malloc_zero(sizeof(replyqueue_t));
     591           7 :   if (alert_sockets_create(&rq->alert, alertsocks_flags) < 0) {
     592             :     //LCOV_EXCL_START
     593             :     tor_free(rq);
     594             :     return NULL;
     595             :     //LCOV_EXCL_STOP
     596             :   }
     597             : 
     598           7 :   tor_mutex_init(&rq->lock);
     599           7 :   TOR_TAILQ_INIT(&rq->answers);
     600             : 
     601           7 :   return rq;
     602             : }
     603             : 
     604             : /** Internal: Run from the libevent mainloop when there is work to handle in
     605             :  * the reply queue handler. */
     606             : static void
     607       18995 : reply_event_cb(evutil_socket_t sock, short events, void *arg)
     608             : {
     609       18995 :   threadpool_t *tp = arg;
     610       18995 :   (void) sock;
     611       18995 :   (void) events;
     612       18995 :   replyqueue_process(tp->reply_queue);
     613       18995 :   if (tp->reply_cb)
     614       18995 :     tp->reply_cb(tp);
     615       18995 : }
     616             : 
     617             : /** Register the threadpool <b>tp</b>'s reply queue with Tor's global
     618             :  * libevent mainloop. If <b>cb</b> is provided, it is run after
     619             :  * each time there is work to process from the reply queue. Return 0 on
     620             :  * success, -1 on failure.
     621             :  */
     622             : int
     623           7 : threadpool_register_reply_event(threadpool_t *tp,
     624             :                                 void (*cb)(threadpool_t *tp))
     625             : {
     626           7 :   struct event_base *base = tor_libevent_get_base();
     627             : 
     628           7 :   if (tp->reply_event) {
     629           0 :     tor_event_free(tp->reply_event);
     630             :   }
     631          14 :   tp->reply_event = tor_event_new(base,
     632           7 :                                   tp->reply_queue->alert.read_fd,
     633             :                                   EV_READ|EV_PERSIST,
     634             :                                   reply_event_cb,
     635             :                                   tp);
     636           7 :   tor_assert(tp->reply_event);
     637           7 :   tp->reply_cb = cb;
     638           7 :   return event_add(tp->reply_event, NULL);
     639             : }
     640             : 
     641             : /**
     642             :  * Process all pending replies on a reply queue. The main thread should call
     643             :  * this function every time the socket returned by replyqueue_get_socket() is
     644             :  * readable.
     645             :  */
     646             : void
     647       18995 : replyqueue_process(replyqueue_t *queue)
     648             : {
     649       18995 :   int r = queue->alert.drain_fn(queue->alert.read_fd);
     650       18995 :   if (r < 0) {
     651             :     //LCOV_EXCL_START
     652             :     static ratelim_t warn_limit = RATELIM_INIT(7200);
     653             :     log_fn_ratelim(&warn_limit, LOG_WARN, LD_GENERAL,
     654             :                  "Failure from drain_fd: %s",
     655             :                    tor_socket_strerror(-r));
     656             :     //LCOV_EXCL_STOP
     657             :   }
     658             : 
     659       18995 :   tor_mutex_acquire(&queue->lock);
     660       88986 :   while (!TOR_TAILQ_EMPTY(&queue->answers)) {
     661             :     /* lock must be held at this point.*/
     662       69991 :     workqueue_entry_t *work = TOR_TAILQ_FIRST(&queue->answers);
     663       69991 :     TOR_TAILQ_REMOVE(&queue->answers, work, next_work);
     664       69991 :     tor_mutex_release(&queue->lock);
     665       69991 :     work->on_pool = NULL;
     666             : 
     667       69991 :     work->reply_fn(work->arg);
     668       69991 :     workqueue_entry_free(work);
     669             : 
     670       69991 :     tor_mutex_acquire(&queue->lock);
     671             :   }
     672             : 
     673       18995 :   tor_mutex_release(&queue->lock);
     674       18995 : }

Generated by: LCOV version 1.14