39 #include "ext/tor_queue.h"
40 #include <event2/event.h>
43 #define WORKQUEUE_PRIORITY_FIRST WQ_PRI_HIGH
44 #define WORKQUEUE_PRIORITY_LAST WQ_PRI_LOW
45 #define WORKQUEUE_N_PRIORITIES (((int) WORKQUEUE_PRIORITY_LAST)+1)
48 typedef struct work_tailq_t work_tailq_t;
60 work_tailq_t
work[WORKQUEUE_N_PRIORITIES];
85 void *(*new_thread_state_fn)(
void*);
86 void (*free_thread_state_fn)(
void*);
87 void *new_thread_state_arg;
91 #define workqueue_priority_bitfield_t ENUM_BF(workqueue_priority_t)
93 #define WORKQUEUE_PRIORITY_BITS 2
110 void (*reply_fn)(
void *arg);
149 void (*reply_fn)(
void*),
154 ent->reply_fn = reply_fn;
156 ent->priority = WQ_PRI_HIGH;
160 #define workqueue_entry_free(ent) \
161 FREE_AND_NULL(workqueue_entry_t, workqueue_entry_free_, (ent))
172 memset(ent, 0xf0,
sizeof(*ent));
198 TOR_TAILQ_REMOVE(&ent->on_pool->work[prio], ent, next_work);
205 workqueue_entry_free(ent);
217 for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
232 work_tailq_t *queue = NULL, *this_queue;
234 for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
235 this_queue = &pool->
work[i];
236 if (!TOR_TAILQ_EMPTY(this_queue)) {
253 TOR_TAILQ_REMOVE(queue,
work, next_work);
284 if (r != WQ_RPL_REPLY) {
292 if (BUG(
work == NULL))
304 if (result != WQ_RPL_REPLY) {
328 was_empty = TOR_TAILQ_EMPTY(&queue->answers);
329 TOR_TAILQ_INSERT_TAIL(&queue->answers,
work, next_work);
333 if (queue->alert.alert_fn(queue->alert.write_fd) < 0) {
354 log_err(
LD_GENERAL,
"Can't launch worker thread.");
389 void (*reply_fn)(
void *),
392 tor_assert(((
int)prio) >= WORKQUEUE_PRIORITY_FIRST &&
393 ((
int)prio) <= WORKQUEUE_PRIORITY_LAST);
398 ent->priority = prio;
402 TOR_TAILQ_INSERT_TAIL(&pool->
work[prio], ent, next_work);
415 void (*reply_fn)(
void *),
438 void *(*dup_fn)(
void *),
440 void (*free_fn)(
void *),
444 void (*old_args_free_fn)(
void *arg);
453 new_args = tor_calloc(
n_threads,
sizeof(
void*));
456 new_args[i] = dup_fn(arg);
472 if (old_args[i] && old_args_free_fn)
473 old_args_free_fn(old_args[i]);
482 #define MAX_THREADS 1024
489 #define CHANCE_PERMISSIVE 37
490 #define CHANCE_STRICT INT32_MAX
520 pool->free_thread_state_fn(state);
544 void (*free_thread_state_fn)(
void*),
552 for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
553 TOR_TAILQ_INIT(&pool->
work[i]);
557 pool->new_thread_state_arg = arg;
558 pool->free_thread_state_fn = free_thread_state_fn;
599 TOR_TAILQ_INIT(&rq->answers);
649 int r = queue->alert.drain_fn(queue->alert.read_fd);
652 static ratelim_t warn_limit = RATELIM_INIT(7200);
654 "Failure from drain_fd: %s",
655 tor_socket_strerror(-r));
660 while (!TOR_TAILQ_EMPTY(&queue->answers)) {
663 TOR_TAILQ_REMOVE(&queue->answers, work, next_work);
665 work->on_pool = NULL;
667 work->reply_fn(work->arg);
668 workqueue_entry_free(work);
int alert_sockets_create(alert_sockets_t *socks_out, uint32_t flags)
struct event_base * tor_libevent_get_base(void)
Header for compat_libevent.c.
void tor_mutex_init_nonrecursive(tor_mutex_t *m)
void tor_mutex_release(tor_mutex_t *m)
void tor_mutex_init(tor_mutex_t *m)
void tor_mutex_acquire(tor_mutex_t *m)
void tor_mutex_uninit(tor_mutex_t *m)
void tor_cond_signal_all(tor_cond_t *cond)
int tor_cond_init(tor_cond_t *cond)
int spawn_func(void(*func)(void *), void *data)
void tor_cond_uninit(tor_cond_t *cond)
void tor_cond_signal_one(tor_cond_t *cond)
int tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex, const struct timeval *tv)
Common functions for using (pseudo-)random number generators.
#define crypto_fast_rng_one_in_n(rng, n)
crypto_fast_rng_t * get_thread_fast_rng(void)
#define log_fn_ratelim(ratelim, severity, domain, args,...)
Summarize similar messages that would otherwise flood the logs.
void *(* new_thread_state_fn)(void *)
work_tailq_t work[WORKQUEUE_N_PRIORITIES]
void(* free_update_arg_fn)(void *)
struct event * reply_event
workqueue_reply_t(* update_fn)(void *, void *)
struct workerthread_t ** threads
replyqueue_t * reply_queue
int32_t lower_priority_chance
struct threadpool_t * in_pool
replyqueue_t * reply_queue
Macros to manage assertions, fatal and non-fatal.
#define tor_assert_nonfatal_unreached()
threadpool_t * threadpool_new(int n_threads, replyqueue_t *replyqueue, void *(*new_thread_state_fn)(void *), void(*free_thread_state_fn)(void *), void *arg)
#define workqueue_priority_bitfield_t
static workerthread_t * workerthread_new(int32_t lower_priority_chance, void *state, threadpool_t *pool, replyqueue_t *replyqueue)
void * workqueue_entry_cancel(workqueue_entry_t *ent)
static int worker_thread_has_work(workerthread_t *thread)
static workqueue_entry_t * worker_thread_extract_next_work(workerthread_t *thread)
static int threadpool_start_threads(threadpool_t *pool, int n)
replyqueue_t * threadpool_get_replyqueue(threadpool_t *tp)
static void workqueue_entry_free_(workqueue_entry_t *ent)
void replyqueue_process(replyqueue_t *queue)
replyqueue_t * replyqueue_new(uint32_t alertsocks_flags)
static void worker_thread_main(void *thread_)
static void reply_event_cb(evutil_socket_t sock, short events, void *arg)
workqueue_entry_t * threadpool_queue_work_priority(threadpool_t *pool, workqueue_priority_t prio, workqueue_reply_t(*fn)(void *, void *), void(*reply_fn)(void *), void *arg)
int threadpool_register_reply_event(threadpool_t *tp, void(*cb)(threadpool_t *tp))
int threadpool_queue_update(threadpool_t *pool, void *(*dup_fn)(void *), workqueue_reply_t(*fn)(void *, void *), void(*free_fn)(void *), void *arg)
static workqueue_entry_t * workqueue_entry_new(workqueue_reply_t(*fn)(void *, void *), void(*reply_fn)(void *), void *arg)
static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work)
#define CHANCE_PERMISSIVE
workqueue_entry_t * threadpool_queue_work(threadpool_t *pool, workqueue_reply_t(*fn)(void *, void *), void(*reply_fn)(void *), void *arg)
#define WORKQUEUE_PRIORITY_BITS