Line data Source code
1 : /* Copyright (c) 2003-2004, Roger Dingledine.
2 : * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
3 : * Copyright (c) 2007-2021, The Tor Project, Inc. */
4 : /* See LICENSE for licensing information */
5 :
6 : /**
7 : * \file cpuworker.c
8 : * \brief Uses the workqueue/threadpool code to farm CPU-intensive activities
9 : * out to subprocesses.
10 : *
11 : * The multithreading backend for this module is in workqueue.c; this module
12 : * specializes workqueue.c.
13 : *
14 : * Right now, we use this infrastructure
15 : * <ul><li>for processing onionskins in onion.c
16 : * <li>for compressing consensuses in consdiffmgr.c,
17 : * <li>and for calculating diffs and compressing them in consdiffmgr.c.
18 : * </ul>
19 : **/
20 : #include "core/or/or.h"
21 : #include "core/or/channel.h"
22 : #include "core/or/circuitlist.h"
23 : #include "core/or/connection_or.h"
24 : #include "app/config/config.h"
25 : #include "core/mainloop/cpuworker.h"
26 : #include "lib/crypt_ops/crypto_rand.h"
27 : #include "lib/crypt_ops/crypto_util.h"
28 : #include "core/or/onion.h"
29 : #include "feature/relay/circuitbuild_relay.h"
30 : #include "feature/relay/onion_queue.h"
31 : #include "feature/stats/rephist.h"
32 : #include "feature/relay/router.h"
33 : #include "lib/evloop/workqueue.h"
34 : #include "core/crypto/onion_crypto.h"
35 :
36 : #include "core/or/or_circuit_st.h"
37 :
38 : static void queue_pending_tasks(void);
39 :
40 : typedef struct worker_state_t {
41 : int generation;
42 : server_onion_keys_t *onion_keys;
43 : } worker_state_t;
44 :
45 : static void *
46 0 : worker_state_new(void *arg)
47 : {
48 0 : worker_state_t *ws;
49 0 : (void)arg;
50 0 : ws = tor_malloc_zero(sizeof(worker_state_t));
51 0 : ws->onion_keys = server_onion_keys_new();
52 0 : return ws;
53 : }
54 :
55 : #define worker_state_free(ws) \
56 : FREE_AND_NULL(worker_state_t, worker_state_free_, (ws))
57 :
58 : static void
59 0 : worker_state_free_(worker_state_t *ws)
60 : {
61 0 : if (!ws)
62 : return;
63 0 : server_onion_keys_free(ws->onion_keys);
64 0 : tor_free(ws);
65 : }
66 :
67 : static void
68 0 : worker_state_free_void(void *arg)
69 : {
70 0 : worker_state_free_(arg);
71 0 : }
72 :
73 : static replyqueue_t *replyqueue = NULL;
74 : static threadpool_t *threadpool = NULL;
75 :
76 : static int total_pending_tasks = 0;
77 : static int max_pending_tasks = 128;
78 :
79 : /** Initialize the cpuworker subsystem. It is OK to call this more than once
80 : * during Tor's lifetime.
81 : */
82 : void
83 0 : cpu_init(void)
84 : {
85 0 : if (!replyqueue) {
86 0 : replyqueue = replyqueue_new(0);
87 : }
88 0 : if (!threadpool) {
89 : /*
90 : In our threadpool implementation, half the threads are permissive and
91 : half are strict (when it comes to running lower-priority tasks). So we
92 : always make sure we have at least two threads, so that there will be at
93 : least one thread of each kind.
94 : */
95 0 : const int n_threads = get_num_cpus(get_options()) + 1;
96 0 : threadpool = threadpool_new(n_threads,
97 : replyqueue,
98 : worker_state_new,
99 : worker_state_free_void,
100 : NULL);
101 :
102 0 : int r = threadpool_register_reply_event(threadpool, NULL);
103 :
104 0 : tor_assert(r == 0);
105 : }
106 :
107 : /* Total voodoo. Can we make this more sensible? */
108 0 : max_pending_tasks = get_num_cpus(get_options()) * 64;
109 0 : }
110 :
111 : /** Magic numbers to make sure our cpuworker_requests don't grow any
112 : * mis-framing bugs. */
113 : #define CPUWORKER_REQUEST_MAGIC 0xda4afeed
114 : #define CPUWORKER_REPLY_MAGIC 0x5eedf00d
115 :
116 : /** A request sent to a cpuworker. */
117 : typedef struct cpuworker_request_t {
118 : /** Magic number; must be CPUWORKER_REQUEST_MAGIC. */
119 : uint32_t magic;
120 :
121 : /** Flag: Are we timing this request? */
122 : unsigned timed : 1;
123 : /** If we're timing this request, when was it sent to the cpuworker? */
124 : struct timeval started_at;
125 :
126 : /** A create cell for the cpuworker to process. */
127 : create_cell_t create_cell;
128 :
129 : /* Turn the above into a tagged union if needed. */
130 : } cpuworker_request_t;
131 :
132 : /** A reply sent by a cpuworker. */
133 : typedef struct cpuworker_reply_t {
134 : /** Magic number; must be CPUWORKER_REPLY_MAGIC. */
135 : uint32_t magic;
136 :
137 : /** True iff we got a successful request. */
138 : uint8_t success;
139 :
140 : /** Are we timing this request? */
141 : unsigned int timed : 1;
142 : /** What handshake type was the request? (Used for timing) */
143 : uint16_t handshake_type;
144 : /** When did we send the request to the cpuworker? */
145 : struct timeval started_at;
146 : /** Once the cpuworker received the request, how many microseconds did it
147 : * take? (This shouldn't overflow; 4 billion micoseconds is over an hour,
148 : * and we'll never have an onion handshake that takes so long.) */
149 : uint32_t n_usec;
150 :
151 : /** Output of processing a create cell
152 : *
153 : * @{
154 : */
155 : /** The created cell to send back. */
156 : created_cell_t created_cell;
157 : /** The keys to use on this circuit. */
158 : uint8_t keys[CPATH_KEY_MATERIAL_LEN];
159 : /** Input to use for authenticating introduce1 cells. */
160 : uint8_t rend_auth_material[DIGEST_LEN];
161 : } cpuworker_reply_t;
162 :
163 : typedef struct cpuworker_job_u_t {
164 : or_circuit_t *circ;
165 : union {
166 : cpuworker_request_t request;
167 : cpuworker_reply_t reply;
168 : } u;
169 : } cpuworker_job_t;
170 :
171 : static workqueue_reply_t
172 0 : update_state_threadfn(void *state_, void *work_)
173 : {
174 0 : worker_state_t *state = state_;
175 0 : worker_state_t *update = work_;
176 0 : server_onion_keys_free(state->onion_keys);
177 0 : state->onion_keys = update->onion_keys;
178 0 : update->onion_keys = NULL;
179 0 : worker_state_free(update);
180 0 : ++state->generation;
181 0 : return WQ_RPL_REPLY;
182 : }
183 :
184 : /** Called when the onion key has changed so update all CPU worker(s) with
185 : * new function pointers with which a new state will be generated.
186 : */
187 : void
188 0 : cpuworkers_rotate_keyinfo(void)
189 : {
190 0 : if (!threadpool) {
191 : /* If we're a client, then we won't have cpuworkers, and we won't need
192 : * to tell them to rotate their state.
193 : */
194 : return;
195 : }
196 0 : if (threadpool_queue_update(threadpool,
197 : worker_state_new,
198 : update_state_threadfn,
199 : worker_state_free_void,
200 : NULL)) {
201 0 : log_warn(LD_OR, "Failed to queue key update for worker threads.");
202 : }
203 : }
204 :
205 : /** Indexed by handshake type: how many onionskins have we processed and
206 : * counted of that type? */
207 : static uint64_t onionskins_n_processed[MAX_ONION_HANDSHAKE_TYPE+1];
208 : /** Indexed by handshake type, corresponding to the onionskins counted in
209 : * onionskins_n_processed: how many microseconds have we spent in cpuworkers
210 : * processing that kind of onionskin? */
211 : static uint64_t onionskins_usec_internal[MAX_ONION_HANDSHAKE_TYPE+1];
212 : /** Indexed by handshake type, corresponding to onionskins counted in
213 : * onionskins_n_processed: how many microseconds have we spent waiting for
214 : * cpuworkers to give us answers for that kind of onionskin?
215 : */
216 : static uint64_t onionskins_usec_roundtrip[MAX_ONION_HANDSHAKE_TYPE+1];
217 :
218 : /** If any onionskin takes longer than this, we clip them to this
219 : * time. (microseconds) */
220 : #define MAX_BELIEVABLE_ONIONSKIN_DELAY (2*1000*1000)
221 :
222 : /** Return true iff we'd like to measure a handshake of type
223 : * <b>onionskin_type</b>. Call only from the main thread. */
224 : static int
225 0 : should_time_request(uint16_t onionskin_type)
226 : {
227 : /* If we've never heard of this type, we shouldn't even be here. */
228 0 : if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE)
229 : return 0;
230 : /* Measure the first N handshakes of each type, to ensure we have a
231 : * sample */
232 0 : if (onionskins_n_processed[onionskin_type] < 4096)
233 : return 1;
234 :
235 : /** Otherwise, measure with P=1/128. We avoid doing this for every
236 : * handshake, since the measurement itself can take a little time. */
237 0 : return crypto_fast_rng_one_in_n(get_thread_fast_rng(), 128);
238 : }
239 :
240 : /** Return an estimate of how many microseconds we will need for a single
241 : * cpuworker to process <b>n_requests</b> onionskins of type
242 : * <b>onionskin_type</b>. */
243 : uint64_t
244 0 : estimated_usec_for_onionskins(uint32_t n_requests, uint16_t onionskin_type)
245 : {
246 0 : if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE) /* should be impossible */
247 0 : return 1000 * (uint64_t)n_requests;
248 0 : if (PREDICT_UNLIKELY(onionskins_n_processed[onionskin_type] < 100)) {
249 : /* Until we have 100 data points, just assume everything takes 1 msec. */
250 0 : return 1000 * (uint64_t)n_requests;
251 : } else {
252 : /* This can't overflow: we'll never have more than 500000 onionskins
253 : * measured in onionskin_usec_internal, and they won't take anything near
254 : * 1 sec each, and we won't have anything like 1 million queued
255 : * onionskins. But that's 5e5 * 1e6 * 1e6, which is still less than
256 : * UINT64_MAX. */
257 0 : return (onionskins_usec_internal[onionskin_type] * n_requests) /
258 : onionskins_n_processed[onionskin_type];
259 : }
260 : }
261 :
262 : /** Compute the absolute and relative overhead of using the cpuworker
263 : * framework for onionskins of type <b>onionskin_type</b>.*/
264 : static int
265 0 : get_overhead_for_onionskins(uint32_t *usec_out, double *frac_out,
266 : uint16_t onionskin_type)
267 : {
268 0 : uint64_t overhead;
269 :
270 0 : *usec_out = 0;
271 0 : *frac_out = 0.0;
272 :
273 0 : if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE) /* should be impossible */
274 : return -1;
275 0 : if (onionskins_n_processed[onionskin_type] == 0 ||
276 0 : onionskins_usec_internal[onionskin_type] == 0 ||
277 0 : onionskins_usec_roundtrip[onionskin_type] == 0)
278 : return -1;
279 :
280 0 : overhead = onionskins_usec_roundtrip[onionskin_type] -
281 : onionskins_usec_internal[onionskin_type];
282 :
283 0 : *usec_out = (uint32_t)(overhead / onionskins_n_processed[onionskin_type]);
284 0 : *frac_out = ((double)overhead) / onionskins_usec_internal[onionskin_type];
285 :
286 0 : return 0;
287 : }
288 :
289 : /** If we've measured overhead for onionskins of type <b>onionskin_type</b>,
290 : * log it. */
291 : void
292 0 : cpuworker_log_onionskin_overhead(int severity, int onionskin_type,
293 : const char *onionskin_type_name)
294 : {
295 0 : uint32_t overhead;
296 0 : double relative_overhead;
297 0 : int r;
298 :
299 0 : r = get_overhead_for_onionskins(&overhead, &relative_overhead,
300 : onionskin_type);
301 0 : if (!overhead || r<0)
302 0 : return;
303 :
304 0 : log_fn(severity, LD_OR,
305 : "%s onionskins have averaged %u usec overhead (%.2f%%) in "
306 : "cpuworker code ",
307 : onionskin_type_name, (unsigned)overhead, relative_overhead*100);
308 : }
309 :
310 : /** Handle a reply from the worker threads. */
311 : static void
312 0 : cpuworker_onion_handshake_replyfn(void *work_)
313 : {
314 0 : cpuworker_job_t *job = work_;
315 0 : cpuworker_reply_t rpl;
316 0 : or_circuit_t *circ = NULL;
317 :
318 0 : tor_assert(total_pending_tasks > 0);
319 0 : --total_pending_tasks;
320 :
321 : /* Could avoid this, but doesn't matter. */
322 0 : memcpy(&rpl, &job->u.reply, sizeof(rpl));
323 :
324 0 : tor_assert(rpl.magic == CPUWORKER_REPLY_MAGIC);
325 :
326 0 : if (rpl.timed && rpl.success &&
327 0 : rpl.handshake_type <= MAX_ONION_HANDSHAKE_TYPE) {
328 : /* Time how long this request took. The handshake_type check should be
329 : needless, but let's leave it in to be safe. */
330 0 : struct timeval tv_end, tv_diff;
331 0 : int64_t usec_roundtrip;
332 0 : tor_gettimeofday(&tv_end);
333 0 : timersub(&tv_end, &rpl.started_at, &tv_diff);
334 0 : usec_roundtrip = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec;
335 0 : if (usec_roundtrip >= 0 &&
336 : usec_roundtrip < MAX_BELIEVABLE_ONIONSKIN_DELAY) {
337 0 : ++onionskins_n_processed[rpl.handshake_type];
338 0 : onionskins_usec_internal[rpl.handshake_type] += rpl.n_usec;
339 0 : onionskins_usec_roundtrip[rpl.handshake_type] += usec_roundtrip;
340 0 : if (onionskins_n_processed[rpl.handshake_type] >= 500000) {
341 : /* Scale down every 500000 handshakes. On a busy server, that's
342 : * less impressive than it sounds. */
343 0 : onionskins_n_processed[rpl.handshake_type] /= 2;
344 0 : onionskins_usec_internal[rpl.handshake_type] /= 2;
345 0 : onionskins_usec_roundtrip[rpl.handshake_type] /= 2;
346 : }
347 : }
348 : }
349 :
350 0 : circ = job->circ;
351 :
352 0 : log_debug(LD_OR,
353 : "Unpacking cpuworker reply %p, circ=%p, success=%d",
354 : job, circ, rpl.success);
355 :
356 0 : if (circ->base_.magic == DEAD_CIRCUIT_MAGIC) {
357 : /* The circuit was supposed to get freed while the reply was
358 : * pending. Instead, it got left for us to free so that we wouldn't freak
359 : * out when the job->circ field wound up pointing to nothing. */
360 0 : log_debug(LD_OR, "Circuit died while reply was pending. Freeing memory.");
361 0 : circ->base_.magic = 0;
362 0 : tor_free(circ);
363 0 : goto done_processing;
364 : }
365 :
366 0 : circ->workqueue_entry = NULL;
367 :
368 0 : if (TO_CIRCUIT(circ)->marked_for_close) {
369 : /* We already marked this circuit; we can't call it open. */
370 0 : log_debug(LD_OR,"circuit is already marked.");
371 0 : goto done_processing;
372 : }
373 :
374 0 : if (rpl.success == 0) {
375 0 : log_debug(LD_OR,
376 : "decoding onionskin failed. "
377 : "(Old key or bad software.) Closing.");
378 0 : circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_TORPROTOCOL);
379 0 : goto done_processing;
380 : }
381 :
382 0 : if (onionskin_answer(circ,
383 : &rpl.created_cell,
384 : (const char*)rpl.keys, sizeof(rpl.keys),
385 : rpl.rend_auth_material) < 0) {
386 0 : log_warn(LD_OR,"onionskin_answer failed. Closing.");
387 0 : circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_INTERNAL);
388 0 : goto done_processing;
389 : }
390 0 : log_debug(LD_OR,"onionskin_answer succeeded. Yay.");
391 :
392 0 : done_processing:
393 0 : memwipe(&rpl, 0, sizeof(rpl));
394 0 : memwipe(job, 0, sizeof(*job));
395 0 : tor_free(job);
396 0 : queue_pending_tasks();
397 0 : }
398 :
399 : /** Implementation function for onion handshake requests. */
400 : static workqueue_reply_t
401 0 : cpuworker_onion_handshake_threadfn(void *state_, void *work_)
402 : {
403 0 : worker_state_t *state = state_;
404 0 : cpuworker_job_t *job = work_;
405 :
406 : /* variables for onion processing */
407 0 : server_onion_keys_t *onion_keys = state->onion_keys;
408 0 : cpuworker_request_t req;
409 0 : cpuworker_reply_t rpl;
410 :
411 0 : memcpy(&req, &job->u.request, sizeof(req));
412 :
413 0 : tor_assert(req.magic == CPUWORKER_REQUEST_MAGIC);
414 0 : memset(&rpl, 0, sizeof(rpl));
415 :
416 0 : const create_cell_t *cc = &req.create_cell;
417 0 : created_cell_t *cell_out = &rpl.created_cell;
418 0 : struct timeval tv_start = {0,0}, tv_end;
419 0 : int n;
420 0 : rpl.timed = req.timed;
421 0 : rpl.started_at = req.started_at;
422 0 : rpl.handshake_type = cc->handshake_type;
423 0 : if (req.timed)
424 0 : tor_gettimeofday(&tv_start);
425 0 : n = onion_skin_server_handshake(cc->handshake_type,
426 0 : cc->onionskin, cc->handshake_len,
427 : onion_keys,
428 : cell_out->reply,
429 : rpl.keys, CPATH_KEY_MATERIAL_LEN,
430 : rpl.rend_auth_material);
431 0 : if (n < 0) {
432 : /* failure */
433 0 : log_debug(LD_OR,"onion_skin_server_handshake failed.");
434 0 : memset(&rpl, 0, sizeof(rpl));
435 0 : rpl.success = 0;
436 : } else {
437 : /* success */
438 0 : log_debug(LD_OR,"onion_skin_server_handshake succeeded.");
439 0 : cell_out->handshake_len = n;
440 0 : switch (cc->cell_type) {
441 0 : case CELL_CREATE:
442 0 : cell_out->cell_type = CELL_CREATED; break;
443 0 : case CELL_CREATE2:
444 0 : cell_out->cell_type = CELL_CREATED2; break;
445 0 : case CELL_CREATE_FAST:
446 0 : cell_out->cell_type = CELL_CREATED_FAST; break;
447 : default:
448 0 : tor_assert(0);
449 : return WQ_RPL_SHUTDOWN;
450 : }
451 0 : rpl.success = 1;
452 : }
453 0 : rpl.magic = CPUWORKER_REPLY_MAGIC;
454 0 : if (req.timed) {
455 0 : struct timeval tv_diff;
456 0 : int64_t usec;
457 0 : tor_gettimeofday(&tv_end);
458 0 : timersub(&tv_end, &tv_start, &tv_diff);
459 0 : usec = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec;
460 0 : if (usec < 0 || usec > MAX_BELIEVABLE_ONIONSKIN_DELAY)
461 0 : rpl.n_usec = MAX_BELIEVABLE_ONIONSKIN_DELAY;
462 : else
463 0 : rpl.n_usec = (uint32_t) usec;
464 : }
465 :
466 0 : memcpy(&job->u.reply, &rpl, sizeof(rpl));
467 :
468 0 : memwipe(&req, 0, sizeof(req));
469 0 : memwipe(&rpl, 0, sizeof(req));
470 0 : return WQ_RPL_REPLY;
471 : }
472 :
473 : /** Take pending tasks from the queue and assign them to cpuworkers. */
474 : static void
475 0 : queue_pending_tasks(void)
476 : {
477 0 : or_circuit_t *circ;
478 0 : create_cell_t *onionskin = NULL;
479 :
480 0 : while (total_pending_tasks < max_pending_tasks) {
481 0 : circ = onion_next_task(&onionskin);
482 :
483 0 : if (!circ)
484 0 : return;
485 :
486 0 : if (assign_onionskin_to_cpuworker(circ, onionskin) < 0)
487 0 : log_info(LD_OR,"assign_to_cpuworker failed. Ignoring.");
488 : }
489 : }
490 :
491 : /** DOCDOC */
492 0 : MOCK_IMPL(workqueue_entry_t *,
493 : cpuworker_queue_work,(workqueue_priority_t priority,
494 : workqueue_reply_t (*fn)(void *, void *),
495 : void (*reply_fn)(void *),
496 : void *arg))
497 : {
498 0 : tor_assert(threadpool);
499 :
500 0 : return threadpool_queue_work_priority(threadpool,
501 : priority,
502 : fn,
503 : reply_fn,
504 : arg);
505 : }
506 :
507 : /** Try to tell a cpuworker to perform the public key operations necessary to
508 : * respond to <b>onionskin</b> for the circuit <b>circ</b>.
509 : *
510 : * Return 0 if we successfully assign the task, or -1 on failure.
511 : */
512 : int
513 0 : assign_onionskin_to_cpuworker(or_circuit_t *circ,
514 : create_cell_t *onionskin)
515 : {
516 0 : workqueue_entry_t *queue_entry;
517 0 : cpuworker_job_t *job;
518 0 : cpuworker_request_t req;
519 0 : int should_time;
520 :
521 0 : tor_assert(threadpool);
522 :
523 0 : if (!circ->p_chan) {
524 0 : log_info(LD_OR,"circ->p_chan gone. Failing circ.");
525 0 : tor_free(onionskin);
526 0 : return -1;
527 : }
528 :
529 0 : if (total_pending_tasks >= max_pending_tasks) {
530 0 : log_debug(LD_OR,"No idle cpuworkers. Queuing.");
531 0 : if (onion_pending_add(circ, onionskin) < 0) {
532 0 : tor_free(onionskin);
533 0 : return -1;
534 : }
535 : return 0;
536 : }
537 :
538 0 : if (!channel_is_client(circ->p_chan))
539 0 : rep_hist_note_circuit_handshake_assigned(onionskin->handshake_type);
540 :
541 0 : should_time = should_time_request(onionskin->handshake_type);
542 0 : memset(&req, 0, sizeof(req));
543 0 : req.magic = CPUWORKER_REQUEST_MAGIC;
544 0 : req.timed = should_time;
545 :
546 0 : memcpy(&req.create_cell, onionskin, sizeof(create_cell_t));
547 :
548 0 : tor_free(onionskin);
549 :
550 0 : if (should_time)
551 0 : tor_gettimeofday(&req.started_at);
552 :
553 0 : job = tor_malloc_zero(sizeof(cpuworker_job_t));
554 0 : job->circ = circ;
555 0 : memcpy(&job->u.request, &req, sizeof(req));
556 0 : memwipe(&req, 0, sizeof(req));
557 :
558 0 : ++total_pending_tasks;
559 0 : queue_entry = threadpool_queue_work_priority(threadpool,
560 : WQ_PRI_HIGH,
561 : cpuworker_onion_handshake_threadfn,
562 : cpuworker_onion_handshake_replyfn,
563 : job);
564 0 : if (!queue_entry) {
565 0 : log_warn(LD_BUG, "Couldn't queue work on threadpool");
566 0 : tor_free(job);
567 0 : return -1;
568 : }
569 :
570 0 : log_debug(LD_OR, "Queued task %p (qe=%p, circ=%p)",
571 : job, queue_entry, job->circ);
572 :
573 0 : circ->workqueue_entry = queue_entry;
574 :
575 0 : return 0;
576 : }
577 :
578 : /** If <b>circ</b> has a pending handshake that hasn't been processed yet,
579 : * remove it from the worker queue. */
580 : void
581 0 : cpuworker_cancel_circ_handshake(or_circuit_t *circ)
582 : {
583 0 : cpuworker_job_t *job;
584 0 : if (circ->workqueue_entry == NULL)
585 0 : return;
586 :
587 0 : job = workqueue_entry_cancel(circ->workqueue_entry);
588 0 : if (job) {
589 : /* It successfully cancelled. */
590 0 : memwipe(job, 0xe0, sizeof(*job));
591 0 : tor_free(job);
592 0 : tor_assert(total_pending_tasks > 0);
593 0 : --total_pending_tasks;
594 : /* if (!job), this is done in cpuworker_onion_handshake_replyfn. */
595 0 : circ->workqueue_entry = NULL;
596 : }
597 : }
|