LCOV - code coverage report
Current view: top level - test - test_pubsub_msg.c (source / functions) Hit Total Coverage
Test: lcov.info Lines: 128 130 98.5 %
Date: 2021-11-24 03:28:48 Functions: 14 16 87.5 %

          Line data    Source code
       1             : /* Copyright (c) 2018-2021, The Tor Project, Inc. */
       2             : /* See LICENSE for licensing information */
       3             : 
       4             : #define DISPATCH_PRIVATE
       5             : 
       6             : #include "test/test.h"
       7             : 
       8             : #include "lib/dispatch/dispatch.h"
       9             : #include "lib/dispatch/dispatch_naming.h"
      10             : #include "lib/dispatch/dispatch_st.h"
      11             : #include "lib/dispatch/msgtypes.h"
      12             : #include "lib/pubsub/pubsub_flags.h"
      13             : #include "lib/pubsub/pub_binding_st.h"
      14             : #include "lib/pubsub/pubsub_build.h"
      15             : #include "lib/pubsub/pubsub_builder_st.h"
      16             : #include "lib/pubsub/pubsub_connect.h"
      17             : #include "lib/pubsub/pubsub_publish.h"
      18             : 
      19             : #include "lib/log/escape.h"
      20             : #include "lib/malloc/malloc.h"
      21             : #include "lib/string/printf.h"
      22             : 
      23             : #include <stdio.h>
      24             : #include <string.h>
      25             : 
      26             : static char *
      27         120 : ex_str_fmt(msg_aux_data_t aux)
      28             : {
      29         120 :   return esc_for_log(aux.ptr);
      30             : }
      31             : static void
      32         107 : ex_str_free(msg_aux_data_t aux)
      33             : {
      34         107 :   tor_free_(aux.ptr);
      35         107 : }
      36             : static dispatch_typefns_t stringfns = {
      37             :   .free_fn = ex_str_free,
      38             :   .fmt_fn = ex_str_fmt
      39             : };
      40             : 
      41             : // We're using the lowest-level publish/subscribe logic here, to avoid the
      42             : // pubsub_macros.h macros and just test the dispatch core.  We'll use a string
      43             : // type for everything.
      44             : 
      45             : #define DECLARE_MESSAGE(suffix)                         \
      46             :   static pub_binding_t pub_binding_##suffix;            \
      47             :   static int msg_received_##suffix = 0;                 \
      48             :   static void recv_msg_##suffix(const msg_t *m) {       \
      49             :     (void)m;                                            \
      50             :     ++msg_received_##suffix;                            \
      51             :   }                                                     \
      52             :   EAT_SEMICOLON
      53             : 
      54             : #define ADD_PUBLISH(binding_suffix, subsys, channel, msg, flags)        \
      55             :   STMT_BEGIN {                                                          \
      56             :     con = pubsub_connector_for_subsystem(builder,                       \
      57             :                                            get_subsys_id(#subsys));     \
      58             :     pubsub_add_pub_(con, &pub_binding_##binding_suffix,                 \
      59             :                       get_channel_id(#channel),                         \
      60             :                       get_message_id(#msg), get_msg_type_id("string"),  \
      61             :                       (flags), __FILE__, __LINE__);                     \
      62             :     pubsub_connector_free(con);                                         \
      63             :   } STMT_END
      64             : 
      65             : #define ADD_SUBSCRIBE(hook_suffix, subsys, channel, msg, flags)         \
      66             :   STMT_BEGIN {                                                          \
      67             :     con = pubsub_connector_for_subsystem(builder,                       \
      68             :                                            get_subsys_id(#subsys));     \
      69             :     pubsub_add_sub_(con, recv_msg_##hook_suffix,                        \
      70             :                       get_channel_id(#channel),                         \
      71             :                       get_message_id(#msg), get_msg_type_id("string"),  \
      72             :                       (flags), __FILE__, __LINE__);                     \
      73             :     pubsub_connector_free(con);                                         \
      74             :   } STMT_END
      75             : 
      76             : #define SEND(binding_suffix, val)                          \
      77             :   STMT_BEGIN {                                             \
      78             :     msg_aux_data_t data_;                                  \
      79             :     data_.ptr = tor_strdup(val);                           \
      80             :     pubsub_pub_(&pub_binding_##binding_suffix, data_);     \
      81             :   } STMT_END
      82             : 
      83          11 : DECLARE_MESSAGE(msg1);
      84           0 : DECLARE_MESSAGE(msg2);
      85           6 : DECLARE_MESSAGE(msg3);
      86           0 : DECLARE_MESSAGE(msg4);
      87           5 : DECLARE_MESSAGE(msg5);
      88             : 
      89             : static smartlist_t *strings_received = NULL;
      90             : static void
      91         995 : recv_msg_copy_string(const msg_t *m)
      92             : {
      93         995 :   const char *s = m->aux_data__.ptr;
      94         995 :   smartlist_add(strings_received, tor_strdup(s));
      95         995 : }
      96             : 
      97             : static void *
      98           5 : setup_dispatcher(const struct testcase_t *testcase)
      99             : {
     100           5 :   (void)testcase;
     101           5 :   pubsub_builder_t *builder = pubsub_builder_new();
     102           5 :   pubsub_connector_t *con;
     103             : 
     104             :   {
     105           5 :     con = pubsub_connector_for_subsystem(builder, get_subsys_id("types"));
     106           5 :     pubsub_connector_register_type_(con,
     107           5 :                                     get_msg_type_id("string"),
     108             :                                     &stringfns,
     109             :                                     "nowhere.c", 99);
     110           5 :     pubsub_connector_free(con);
     111             :   }
     112             :   // message1 has one publisher and one subscriber.
     113           5 :   ADD_PUBLISH(msg1, sys1, main, message1, 0);
     114           5 :   ADD_SUBSCRIBE(msg1, sys2, main, message1, 0);
     115             : 
     116             :   // message2 has a publisher and a stub subscriber.
     117           5 :   ADD_PUBLISH(msg2, sys1, main, message2, 0);
     118           5 :   ADD_SUBSCRIBE(msg2, sys2, main, message2, DISP_FLAG_STUB);
     119             : 
     120             :   // message3 has a publisher and three subscribers.
     121           5 :   ADD_PUBLISH(msg3, sys1, main, message3, 0);
     122           5 :   ADD_SUBSCRIBE(msg3, sys2, main, message3, 0);
     123           5 :   ADD_SUBSCRIBE(msg3, sys3, main, message3, 0);
     124           5 :   ADD_SUBSCRIBE(msg3, sys4, main, message3, 0);
     125             : 
     126             :   // message4 has one publisher and two subscribers, but it's on another
     127             :   // channel.
     128           5 :   ADD_PUBLISH(msg4, sys2, other, message4, 0);
     129           5 :   ADD_SUBSCRIBE(msg4, sys1, other, message4, 0);
     130           5 :   ADD_SUBSCRIBE(msg4, sys3, other, message4, 0);
     131             : 
     132             :   // message5 has a huge number of recipients.
     133           5 :   ADD_PUBLISH(msg5, sys3, main, message5, 0);
     134           5 :   ADD_SUBSCRIBE(msg5, sys4, main, message5, 0);
     135           5 :   ADD_SUBSCRIBE(msg5, sys5, main, message5, 0);
     136           5 :   ADD_SUBSCRIBE(msg5, sys6, main, message5, 0);
     137           5 :   ADD_SUBSCRIBE(msg5, sys7, main, message5, 0);
     138           5 :   ADD_SUBSCRIBE(msg5, sys8, main, message5, 0);
     139        4980 :   for (int i = 0; i < 1000-5; ++i) {
     140        4975 :     char *sys;
     141        4975 :     tor_asprintf(&sys, "xsys-%d", i);
     142        4975 :     con = pubsub_connector_for_subsystem(builder, get_subsys_id(sys));
     143        4975 :     pubsub_add_sub_(con, recv_msg_copy_string,
     144        4975 :                     get_channel_id("main"),
     145        4975 :                     get_message_id("message5"),
     146        4975 :                     get_msg_type_id("string"), 0, "here", 100);
     147        4975 :     pubsub_connector_free(con);
     148        4975 :     tor_free(sys);
     149             :   }
     150             : 
     151           5 :   return pubsub_builder_finalize(builder, NULL);
     152             : }
     153             : 
     154             : static int
     155           5 : cleanup_dispatcher(const struct testcase_t *testcase, void *dispatcher_)
     156             : {
     157           5 :   (void)testcase;
     158           5 :   dispatch_t *dispatcher = dispatcher_;
     159           5 :   dispatch_free(dispatcher);
     160           5 :   return 1;
     161             : }
     162             : 
     163             : static const struct testcase_setup_t dispatcher_setup = {
     164             :   setup_dispatcher, cleanup_dispatcher
     165             : };
     166             : 
     167             : static void
     168           1 : test_pubsub_msg_minimal(void *arg)
     169             : {
     170           1 :   dispatch_t *d = arg;
     171             : 
     172           1 :   tt_int_op(0, OP_EQ, msg_received_msg1);
     173           1 :   SEND(msg1, "hello world");
     174           1 :   tt_int_op(0, OP_EQ, msg_received_msg1); // hasn't actually arrived yet.
     175             : 
     176           1 :   tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 1000));
     177           1 :   tt_int_op(1, OP_EQ, msg_received_msg1); // we got the message!
     178             : 
     179           1 :  done:
     180           1 :   ;
     181           1 : }
     182             : 
     183             : static void
     184           1 : test_pubsub_msg_send_to_stub(void *arg)
     185             : {
     186           1 :   dispatch_t *d = arg;
     187             : 
     188           1 :   tt_int_op(0, OP_EQ, msg_received_msg2);
     189           1 :   SEND(msg2, "hello silence");
     190           1 :   tt_int_op(0, OP_EQ, msg_received_msg2); // hasn't actually arrived yet.
     191             : 
     192           1 :   tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 1000));
     193           1 :   tt_int_op(0, OP_EQ, msg_received_msg2); // doesn't arrive -- stub hook.
     194             : 
     195           1 :  done:
     196           1 :   ;
     197           1 : }
     198             : 
     199             : static void
     200           1 : test_pubsub_msg_cancel_msgs(void *arg)
     201             : {
     202           1 :   dispatch_t *d = arg;
     203             : 
     204           1 :   tt_int_op(0, OP_EQ, msg_received_msg1);
     205         101 :   for (int i = 0; i < 100; ++i) {
     206         100 :     SEND(msg1, "hello world");
     207             :   }
     208           1 :   tt_int_op(0, OP_EQ, msg_received_msg1); // hasn't actually arrived yet.
     209             : 
     210           1 :   tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 10));
     211           1 :   tt_int_op(10, OP_EQ, msg_received_msg1); // we got the message 10 times.
     212             : 
     213             :   // At this point, the dispatcher will be freed with queued, undelivered
     214             :   // messages.
     215           1 :  done:
     216           1 :   ;
     217           1 : }
     218             : 
     219             : struct alertfn_target {
     220             :   dispatch_t *d;
     221             :   channel_id_t ch;
     222             :   int count;
     223             : };
     224             : static void
     225           3 : alertfn_generic(dispatch_t *d, channel_id_t ch, void *arg)
     226             : {
     227           3 :   struct alertfn_target *t = arg;
     228           3 :   tt_ptr_op(d, OP_EQ, t->d);
     229           3 :   tt_int_op(ch, OP_EQ, t->ch);
     230           3 :   ++t->count;
     231           3 :  done:
     232           3 :   ;
     233           3 : }
     234             : 
     235             : static void
     236           1 : test_pubsub_msg_alertfns(void *arg)
     237             : {
     238           1 :   dispatch_t *d = arg;
     239           1 :   struct alertfn_target ch1_a = { d, get_channel_id("main"), 0 };
     240           1 :   struct alertfn_target ch2_a = { d, get_channel_id("other"), 0 };
     241             : 
     242           1 :   tt_int_op(0, OP_EQ,
     243             :             dispatch_set_alert_fn(d, get_channel_id("main"),
     244             :                                   alertfn_generic, &ch1_a));
     245           1 :   tt_int_op(0, OP_EQ,
     246             :             dispatch_set_alert_fn(d, get_channel_id("other"),
     247             :                                   alertfn_generic, &ch2_a));
     248             : 
     249           1 :   SEND(msg3, "hello");
     250           1 :   tt_int_op(ch1_a.count, OP_EQ, 1);
     251           1 :   SEND(msg3, "world");
     252           1 :   tt_int_op(ch1_a.count, OP_EQ, 1); // only the first message sends an alert
     253           1 :   tt_int_op(ch2_a.count, OP_EQ, 0); // no alert for 'other'
     254             : 
     255           1 :   SEND(msg4, "worse things happen in C");
     256           1 :   tt_int_op(ch2_a.count, OP_EQ, 1);
     257             : 
     258             :   // flush the first (main) channel...
     259           1 :   tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 1000));
     260           1 :   tt_int_op(6, OP_EQ, msg_received_msg3); // 3 subscribers, 2 instances.
     261             : 
     262             :   // now that the main channel is flushed, sending another message on it
     263             :   // starts another alert.
     264           1 :   tt_int_op(ch1_a.count, OP_EQ, 1);
     265           1 :   SEND(msg1, "plover");
     266           1 :   tt_int_op(ch1_a.count, OP_EQ, 2);
     267           1 :   tt_int_op(ch2_a.count, OP_EQ, 1);
     268             : 
     269           1 :  done:
     270           1 :   ;
     271           1 : }
     272             : 
     273             : /* try more than N_FAST_FNS hooks on msg5 */
     274             : static void
     275           1 : test_pubsub_msg_many_hooks(void *arg)
     276             : {
     277           1 :   dispatch_t *d = arg;
     278           1 :   strings_received = smartlist_new();
     279             : 
     280           1 :   tt_int_op(0, OP_EQ, msg_received_msg5);
     281           1 :   SEND(msg5, "hello world");
     282           1 :   tt_int_op(0, OP_EQ, msg_received_msg5);
     283           1 :   tt_int_op(0, OP_EQ, smartlist_len(strings_received));
     284             : 
     285           1 :   tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 100000));
     286           1 :   tt_int_op(5, OP_EQ, msg_received_msg5);
     287           1 :   tt_int_op(995, OP_EQ, smartlist_len(strings_received));
     288             : 
     289           1 :  done:
     290         996 :   SMARTLIST_FOREACH(strings_received, char *, s, tor_free(s));
     291           1 :   smartlist_free(strings_received);
     292           1 : }
     293             : 
     294             : #define T(name)                                                 \
     295             :   { #name, test_pubsub_msg_ ## name , TT_FORK,                \
     296             :       &dispatcher_setup, NULL }
     297             : 
     298             : struct testcase_t pubsub_msg_tests[] = {
     299             :   T(minimal),
     300             :   T(send_to_stub),
     301             :   T(cancel_msgs),
     302             :   T(alertfns),
     303             :   T(many_hooks),
     304             :   END_OF_TESTCASES
     305             : };

Generated by: LCOV version 1.14