LCOV - code coverage report
Current view: top level - lib/dispatch - dispatch_core.c (source / functions) Hit Total Coverage
Test: lcov.info Lines: 84 92 91.3 %
Date: 2021-11-24 03:28:48 Functions: 9 9 100.0 %

          Line data    Source code
       1             : /* Copyright (c) 2001, Matej Pfajfar.
       2             :  * Copyright (c) 2001-2004, Roger Dingledine.
       3             :  * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
       4             :  * Copyright (c) 2007-2021, The Tor Project, Inc. */
       5             : /* See LICENSE for licensing information */
       6             : 
       7             : /**
       8             :  * \file dispatch_core.c
       9             :  * \brief Core module for sending and receiving messages.
      10             :  */
      11             : 
      12             : #define DISPATCH_PRIVATE
      13             : #include "orconfig.h"
      14             : 
      15             : #include "lib/dispatch/dispatch.h"
      16             : #include "lib/dispatch/dispatch_st.h"
      17             : #include "lib/dispatch/dispatch_naming.h"
      18             : 
      19             : #include "lib/malloc/malloc.h"
      20             : #include "lib/log/util_bug.h"
      21             : 
      22             : #include <string.h>
      23             : 
      24             : /**
      25             :  * Use <b>d</b> to drop all storage held for <b>msg</b>.
      26             :  *
      27             :  * (We need the dispatcher so we know how to free the auxiliary data.)
      28             :  **/
      29             : void
      30         150 : dispatch_free_msg_(const dispatch_t *d, msg_t *msg)
      31             : {
      32         150 :   if (!msg)
      33             :     return;
      34             : 
      35         150 :   d->typefns[msg->type].free_fn(msg->aux_data__);
      36         150 :   tor_free(msg);
      37             : }
      38             : 
      39             : /**
      40             :  * Format the auxiliary data held by msg.
      41             :  **/
      42             : char *
      43         211 : dispatch_fmt_msg_data(const dispatch_t *d, const msg_t *msg)
      44             : {
      45         211 :   if (!msg)
      46             :     return NULL;
      47             : 
      48         211 :   return d->typefns[msg->type].fmt_fn(msg->aux_data__);
      49             : }
      50             : 
      51             : /**
      52             :  * Release all storage held by <b>d</b>.
      53             :  **/
      54             : void
      55         501 : dispatch_free_(dispatch_t *d)
      56             : {
      57         501 :   if (d == NULL)
      58             :     return;
      59             : 
      60         260 :   size_t n_queues = d->n_queues;
      61         781 :   for (size_t i = 0; i < n_queues; ++i) {
      62         521 :     msg_t *m, *mtmp;
      63         613 :     TOR_SIMPLEQ_FOREACH_SAFE(m, &d->queues[i].queue, next, mtmp) {
      64          92 :       dispatch_free_msg(d, m);
      65             :     }
      66             :   }
      67             : 
      68         260 :   size_t n_msgs = d->n_msgs;
      69             : 
      70        1791 :   for (size_t i = 0; i < n_msgs; ++i) {
      71        1531 :     tor_free(d->table[i]);
      72             :   }
      73         260 :   tor_free(d->table);
      74         260 :   tor_free(d->typefns);
      75         260 :   tor_free(d->queues);
      76             : 
      77             :   // This is the only time we will treat d->cfg as non-const.
      78             :   //dispatch_cfg_free_((dispatch_items_t *) d->cfg);
      79             : 
      80         260 :   tor_free(d);
      81             : }
      82             : 
      83             : /**
      84             :  * Tell the dispatcher to call <b>fn</b> with <b>userdata</b> whenever
      85             :  * <b>chan</b> becomes nonempty.  Return 0 on success, -1 on error.
      86             :  **/
      87             : int
      88          23 : dispatch_set_alert_fn(dispatch_t *d, channel_id_t chan,
      89             :                       dispatch_alertfn_t fn, void *userdata)
      90             : {
      91          23 :   if (BUG(chan >= d->n_queues))
      92           0 :     return -1;
      93             : 
      94          23 :   dqueue_t *q = &d->queues[chan];
      95          23 :   q->alert_fn = fn;
      96          23 :   q->alert_fn_arg = userdata;
      97          23 :   return 0;
      98             : }
      99             : 
     100             : /**
     101             :  * Send a message on the appropriate channel notifying that channel if
     102             :  * necessary.
     103             :  *
     104             :  * This function takes ownership of the auxiliary data; it can't be static or
     105             :  * stack-allocated, and the caller is not allowed to use it afterwards.
     106             :  *
     107             :  * This function does not check the various vields of the message object for
     108             :  * consistency.
     109             :  **/
     110             : int
     111           4 : dispatch_send(dispatch_t *d,
     112             :               subsys_id_t sender,
     113             :               channel_id_t channel,
     114             :               message_id_t msg,
     115             :               msg_type_id_t type,
     116             :               msg_aux_data_t auxdata)
     117             : {
     118           4 :   if (!d->table[msg]) {
     119             :     /* Fast path: nobody wants this data. */
     120             : 
     121           1 :     d->typefns[type].free_fn(auxdata);
     122           1 :     return 0;
     123             :   }
     124             : 
     125           3 :   msg_t *m = tor_malloc(sizeof(msg_t));
     126             : 
     127           3 :   m->sender = sender;
     128           3 :   m->channel = channel;
     129           3 :   m->msg = msg;
     130           3 :   m->type = type;
     131           3 :   memcpy(&m->aux_data__, &auxdata, sizeof(msg_aux_data_t));
     132             : 
     133           3 :   return dispatch_send_msg(d, m);
     134             : }
     135             : 
     136             : int
     137           3 : dispatch_send_msg(dispatch_t *d, msg_t *m)
     138             : {
     139           3 :   if (BUG(!d))
     140           0 :     goto err;
     141           3 :   if (BUG(!m))
     142           0 :     goto err;
     143           3 :   if (BUG(m->channel >= d->n_queues))
     144           0 :     goto err;
     145           3 :   if (BUG(m->msg >= d->n_msgs))
     146           0 :     goto err;
     147             : 
     148           3 :   dtbl_entry_t *ent = d->table[m->msg];
     149           3 :   if (ent) {
     150           3 :     if (BUG(m->type != ent->type))
     151           0 :       goto err;
     152           3 :     if (BUG(m->channel != ent->channel))
     153           0 :       goto err;
     154             :   }
     155             : 
     156           3 :   return dispatch_send_msg_unchecked(d, m);
     157             :  err:
     158             :   /* Probably it isn't safe to free m, since type could be wrong. */
     159             :   return -1;
     160             : }
     161             : 
     162             : /**
     163             :  * Send a message on the appropriate queue, notifying that queue if necessary.
     164             :  *
     165             :  * This function takes ownership of the message object and its auxiliary data;
     166             :  * it can't be static or stack-allocated, and the caller isn't allowed to use
     167             :  * it afterwards.
     168             :  *
     169             :  * This function does not check the various fields of the message object for
     170             :  * consistency, and can crash if they are out of range.  Only functions that
     171             :  * have already constructed the message in a safe way, or checked it for
     172             :  * correctness themselves, should call this function.
     173             :  **/
     174             : int
     175         150 : dispatch_send_msg_unchecked(dispatch_t *d, msg_t *m)
     176             : {
     177             :   /* Find the right queue. */
     178         150 :   dqueue_t *q = &d->queues[m->channel];
     179         150 :   bool was_empty = TOR_SIMPLEQ_EMPTY(&q->queue);
     180             : 
     181             :   /* Append the message. */
     182         150 :   TOR_SIMPLEQ_INSERT_TAIL(&q->queue, m, next);
     183             : 
     184         150 :   if (debug_logging_enabled()) {
     185         150 :     char *arg = dispatch_fmt_msg_data(d, m);
     186         150 :     log_debug(LD_MESG,
     187             :               "Queued: %s (%s) from %s, on %s.",
     188             :               get_message_id_name(m->msg),
     189             :               arg,
     190             :               get_subsys_id_name(m->sender),
     191             :               get_channel_id_name(m->channel));
     192         150 :     tor_free(arg);
     193             :   }
     194             : 
     195             :   /* If we just made the queue nonempty for the first time, call the alert
     196             :    * function. */
     197         150 :   if (was_empty) {
     198          50 :     q->alert_fn(d, m->channel, q->alert_fn_arg);
     199             :   }
     200             : 
     201         150 :   return 0;
     202             : }
     203             : 
     204             : /**
     205             :  * Run all of the callbacks on <b>d</b> associated with <b>m</b>.
     206             :  **/
     207             : static void
     208          58 : dispatcher_run_msg_cbs(const dispatch_t *d, msg_t *m)
     209             : {
     210          58 :   tor_assert(m->msg <= d->n_msgs);
     211          58 :   dtbl_entry_t *ent = d->table[m->msg];
     212          58 :   int n_fns = ent->n_fns;
     213             : 
     214          58 :   if (debug_logging_enabled()) {
     215          58 :     char *arg = dispatch_fmt_msg_data(d, m);
     216          58 :     log_debug(LD_MESG,
     217             :               "Delivering: %s (%s) from %s, on %s:",
     218             :               get_message_id_name(m->msg),
     219             :               arg,
     220             :               get_subsys_id_name(m->sender),
     221             :               get_channel_id_name(m->channel));
     222          58 :     tor_free(arg);
     223             :   }
     224             : 
     225          58 :   int i;
     226        1128 :   for (i=0; i < n_fns; ++i) {
     227        1070 :     if (ent->rcv[i].enabled) {
     228        1070 :       log_debug(LD_MESG, "  Delivering to %s.",
     229             :                 get_subsys_id_name(ent->rcv[i].sys));
     230        1070 :       ent->rcv[i].fn(m);
     231             :     }
     232             :   }
     233          58 : }
     234             : 
     235             : /**
     236             :  * Run up to <b>max_msgs</b> callbacks for messages on the channel <b>ch</b>
     237             :  * on the given dispatcher.  Return 0 on success or recoverable failure,
     238             :  * -1 on unrecoverable error.
     239             :  **/
     240             : int
     241          50 : dispatch_flush(dispatch_t *d, channel_id_t ch, int max_msgs)
     242             : {
     243          50 :   if (BUG(ch >= d->n_queues))
     244           0 :     return 0;
     245             : 
     246          50 :   int n_flushed = 0;
     247          50 :   dqueue_t *q = &d->queues[ch];
     248             : 
     249         108 :   while (n_flushed < max_msgs) {
     250         107 :     msg_t *m = TOR_SIMPLEQ_FIRST(&q->queue);
     251         107 :     if (!m)
     252             :       break;
     253          58 :     TOR_SIMPLEQ_REMOVE_HEAD(&q->queue, next);
     254          58 :     dispatcher_run_msg_cbs(d, m);
     255          58 :     dispatch_free_msg(d, m);
     256          58 :     ++n_flushed;
     257             :   }
     258             : 
     259             :   return 0;
     260             : }

Generated by: LCOV version 1.14