test_xpub_verbose.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. /*
  2. Copyright (c) 2018 Contributors as noted in the AUTHORS file
  3. This file is part of libzmq, the ZeroMQ core engine in C++.
  4. libzmq is free software; you can redistribute it and/or modify it under
  5. the terms of the GNU Lesser General Public License (LGPL) as published
  6. by the Free Software Foundation; either version 3 of the License, or
  7. (at your option) any later version.
  8. As a special exception, the Contributors give you permission to link
  9. this library with independent modules to produce an executable,
  10. regardless of the license terms of these independent modules, and to
  11. copy and distribute the resulting executable under terms of your choice,
  12. provided that you also meet, for each linked independent module, the
  13. terms and conditions of the license of that module. An independent
  14. module is a module which is not derived from or based on this library.
  15. If you modify this library, you must extend this exception to your
  16. version of the library.
  17. libzmq is distributed in the hope that it will be useful, but WITHOUT
  18. ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  19. FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
  20. License for more details.
  21. You should have received a copy of the GNU Lesser General Public License
  22. along with this program. If not, see <http://www.gnu.org/licenses/>.
  23. */
  24. #include "testutil.hpp"
  25. #include "testutil_unity.hpp"
  26. SETUP_TEARDOWN_TESTCONTEXT
  27. const uint8_t unsubscribe_a_msg[] = {0, 'A'};
  28. const uint8_t subscribe_a_msg[] = {1, 'A'};
  29. const uint8_t subscribe_b_msg[] = {1, 'B'};
  30. const char test_endpoint[] = "inproc://soname";
  31. const char topic_a[] = "A";
  32. const char topic_b[] = "B";
  33. void test_xpub_verbose_one_sub ()
  34. {
  35. void *pub = test_context_socket (ZMQ_XPUB);
  36. TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, test_endpoint));
  37. void *sub = test_context_socket (ZMQ_SUB);
  38. TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, test_endpoint));
  39. // Subscribe for A
  40. TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, topic_a, 1));
  41. // Receive subscriptions from subscriber
  42. recv_array_expect_success (pub, subscribe_a_msg, 0);
  43. // Subscribe socket for B instead
  44. TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, topic_b, 1));
  45. // Receive subscriptions from subscriber
  46. recv_array_expect_success (pub, subscribe_b_msg, 0);
  47. // Subscribe again for A again
  48. TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, topic_a, 1));
  49. // This time it is duplicated, so it will be filtered out
  50. TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub, NULL, 0, ZMQ_DONTWAIT));
  51. int verbose = 1;
  52. TEST_ASSERT_SUCCESS_ERRNO (
  53. zmq_setsockopt (pub, ZMQ_XPUB_VERBOSE, &verbose, sizeof (int)));
  54. // Subscribe socket for A again
  55. TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, topic_a, 1));
  56. // This time with VERBOSE the duplicated sub will be received
  57. recv_array_expect_success (pub, subscribe_a_msg, 0);
  58. // Sending A message and B Message
  59. send_string_expect_success (pub, topic_a, 0);
  60. send_string_expect_success (pub, topic_b, 0);
  61. recv_string_expect_success (sub, topic_a, 0);
  62. recv_string_expect_success (sub, topic_b, 0);
  63. // Clean up.
  64. test_context_socket_close (pub);
  65. test_context_socket_close (sub);
  66. }
  67. void create_xpub_with_2_subs (void **pub_, void **sub0_, void **sub1_)
  68. {
  69. *pub_ = test_context_socket (ZMQ_XPUB);
  70. TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (*pub_, test_endpoint));
  71. *sub0_ = test_context_socket (ZMQ_SUB);
  72. TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (*sub0_, test_endpoint));
  73. *sub1_ = test_context_socket (ZMQ_SUB);
  74. TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (*sub1_, test_endpoint));
  75. }
  76. void create_duplicate_subscription (void *pub_, void *sub0_, void *sub1_)
  77. {
  78. // Subscribe for A
  79. TEST_ASSERT_SUCCESS_ERRNO (
  80. zmq_setsockopt (sub0_, ZMQ_SUBSCRIBE, topic_a, 1));
  81. // Receive subscriptions from subscriber
  82. recv_array_expect_success (pub_, subscribe_a_msg, 0);
  83. // Subscribe again for A on the other socket
  84. TEST_ASSERT_SUCCESS_ERRNO (
  85. zmq_setsockopt (sub1_, ZMQ_SUBSCRIBE, topic_a, 1));
  86. // This time it is duplicated, so it will be filtered out by XPUB
  87. TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub_, NULL, 0, ZMQ_DONTWAIT));
  88. }
  89. void test_xpub_verbose_two_subs ()
  90. {
  91. void *pub, *sub0, *sub1;
  92. create_xpub_with_2_subs (&pub, &sub0, &sub1);
  93. create_duplicate_subscription (pub, sub0, sub1);
  94. // Subscribe socket for B instead
  95. TEST_ASSERT_SUCCESS_ERRNO (
  96. zmq_setsockopt (sub0, ZMQ_SUBSCRIBE, topic_b, 1));
  97. // Receive subscriptions from subscriber
  98. recv_array_expect_success (pub, subscribe_b_msg, 0);
  99. int verbose = 1;
  100. TEST_ASSERT_SUCCESS_ERRNO (
  101. zmq_setsockopt (pub, ZMQ_XPUB_VERBOSE, &verbose, sizeof (int)));
  102. // Subscribe socket for A again
  103. TEST_ASSERT_SUCCESS_ERRNO (
  104. zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic_a, 1));
  105. // This time with VERBOSE the duplicated sub will be received
  106. recv_array_expect_success (pub, subscribe_a_msg, 0);
  107. // Sending A message and B Message
  108. send_string_expect_success (pub, topic_a, 0);
  109. send_string_expect_success (pub, topic_b, 0);
  110. recv_string_expect_success (sub0, topic_a, 0);
  111. recv_string_expect_success (sub1, topic_a, 0);
  112. recv_string_expect_success (sub0, topic_b, 0);
  113. // Clean up.
  114. test_context_socket_close (pub);
  115. test_context_socket_close (sub0);
  116. test_context_socket_close (sub1);
  117. }
  118. void test_xpub_verboser_one_sub ()
  119. {
  120. // Create a publisher
  121. void *pub = test_context_socket (ZMQ_XPUB);
  122. TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, test_endpoint));
  123. // Create a subscriber
  124. void *sub = test_context_socket (ZMQ_SUB);
  125. TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, test_endpoint));
  126. // Unsubscribe for A, does not exist yet
  127. TEST_ASSERT_SUCCESS_ERRNO (
  128. zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, topic_a, 1));
  129. // Does not exist, so it will be filtered out by XSUB
  130. TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub, NULL, 0, ZMQ_DONTWAIT));
  131. // Subscribe for A
  132. TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, topic_a, 1));
  133. // Receive subscriptions from subscriber
  134. recv_array_expect_success (pub, subscribe_a_msg, 0);
  135. // Subscribe again for A again, XSUB will increase refcount
  136. TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, topic_a, 1));
  137. // This time it is duplicated, so it will be filtered out by XPUB
  138. TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub, NULL, 0, ZMQ_DONTWAIT));
  139. // Unsubscribe for A, this time it exists in XPUB
  140. TEST_ASSERT_SUCCESS_ERRNO (
  141. zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, topic_a, 1));
  142. // XSUB refcounts and will not actually send unsub to PUB until the number
  143. // of unsubs match the earlier subs
  144. TEST_ASSERT_SUCCESS_ERRNO (
  145. zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, topic_a, 1));
  146. // Receive unsubscriptions from subscriber
  147. recv_array_expect_success (pub, unsubscribe_a_msg, 0);
  148. // XSUB only sends the last and final unsub, so XPUB will only receive 1
  149. TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub, NULL, 0, ZMQ_DONTWAIT));
  150. // Unsubscribe for A, does not exist anymore
  151. TEST_ASSERT_SUCCESS_ERRNO (
  152. zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, topic_a, 1));
  153. // Does not exist, so it will be filtered out by XSUB
  154. TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub, NULL, 0, ZMQ_DONTWAIT));
  155. int verbose = 1;
  156. TEST_ASSERT_SUCCESS_ERRNO (
  157. zmq_setsockopt (pub, ZMQ_XPUB_VERBOSER, &verbose, sizeof (int)));
  158. // Subscribe socket for A again
  159. TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, topic_a, 1));
  160. // Receive subscriptions from subscriber, did not exist anymore
  161. recv_array_expect_success (pub, subscribe_a_msg, 0);
  162. // Sending A message to make sure everything still works
  163. send_string_expect_success (pub, topic_a, 0);
  164. recv_string_expect_success (sub, topic_a, 0);
  165. // Unsubscribe for A, this time it exists
  166. TEST_ASSERT_SUCCESS_ERRNO (
  167. zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, topic_a, 1));
  168. // Receive unsubscriptions from subscriber
  169. recv_array_expect_success (pub, unsubscribe_a_msg, 0);
  170. // Unsubscribe for A again, it does not exist anymore so XSUB will filter
  171. TEST_ASSERT_SUCCESS_ERRNO (
  172. zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, topic_a, 1));
  173. // XSUB only sends unsub if it matched it in its trie, IOW: it will only
  174. // send it if it existed in the first place even with XPUB_VERBBOSER
  175. TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub, NULL, 0, ZMQ_DONTWAIT));
  176. // Clean up.
  177. test_context_socket_close (pub);
  178. test_context_socket_close (sub);
  179. }
  180. void test_xpub_verboser_two_subs ()
  181. {
  182. void *pub, *sub0, *sub1;
  183. create_xpub_with_2_subs (&pub, &sub0, &sub1);
  184. create_duplicate_subscription (pub, sub0, sub1);
  185. // Unsubscribe for A, this time it exists in XPUB
  186. TEST_ASSERT_SUCCESS_ERRNO (
  187. zmq_setsockopt (sub0, ZMQ_UNSUBSCRIBE, topic_a, 1));
  188. // sub1 is still subscribed, so no notification
  189. TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub, NULL, 0, ZMQ_DONTWAIT));
  190. // Unsubscribe the second socket to trigger the notification
  191. TEST_ASSERT_SUCCESS_ERRNO (
  192. zmq_setsockopt (sub1, ZMQ_UNSUBSCRIBE, topic_a, 1));
  193. // Receive unsubscriptions since all sockets are gone
  194. recv_array_expect_success (pub, unsubscribe_a_msg, 0);
  195. // Make really sure there is only one notification
  196. TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub, NULL, 0, ZMQ_DONTWAIT));
  197. int verbose = 1;
  198. TEST_ASSERT_SUCCESS_ERRNO (
  199. zmq_setsockopt (pub, ZMQ_XPUB_VERBOSER, &verbose, sizeof (int)));
  200. // Subscribe socket for A again
  201. TEST_ASSERT_SUCCESS_ERRNO (
  202. zmq_setsockopt (sub0, ZMQ_SUBSCRIBE, topic_a, 1));
  203. // Subscribe socket for A again
  204. TEST_ASSERT_SUCCESS_ERRNO (
  205. zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic_a, 1));
  206. // Receive subscriptions from subscriber, did not exist anymore
  207. recv_array_expect_success (pub, subscribe_a_msg, 0);
  208. // VERBOSER is set, so subs from both sockets are received
  209. recv_array_expect_success (pub, subscribe_a_msg, 0);
  210. // Sending A message to make sure everything still works
  211. send_string_expect_success (pub, topic_a, 0);
  212. recv_string_expect_success (sub0, topic_a, 0);
  213. recv_string_expect_success (sub1, topic_a, 0);
  214. // Unsubscribe for A
  215. TEST_ASSERT_SUCCESS_ERRNO (
  216. zmq_setsockopt (sub1, ZMQ_UNSUBSCRIBE, topic_a, 1));
  217. // Receive unsubscriptions from first subscriber due to VERBOSER
  218. recv_array_expect_success (pub, unsubscribe_a_msg, 0);
  219. // Unsubscribe for A again from the other socket
  220. TEST_ASSERT_SUCCESS_ERRNO (
  221. zmq_setsockopt (sub0, ZMQ_UNSUBSCRIBE, topic_a, 1));
  222. // Receive unsubscriptions from first subscriber due to VERBOSER
  223. recv_array_expect_success (pub, unsubscribe_a_msg, 0);
  224. // Unsubscribe again to make sure it gets filtered now
  225. TEST_ASSERT_SUCCESS_ERRNO (
  226. zmq_setsockopt (sub1, ZMQ_UNSUBSCRIBE, topic_a, 1));
  227. // Unmatched, so XSUB filters even with VERBOSER
  228. TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub, NULL, 0, ZMQ_DONTWAIT));
  229. // Clean up.
  230. test_context_socket_close (pub);
  231. test_context_socket_close (sub0);
  232. test_context_socket_close (sub1);
  233. }
  234. int main ()
  235. {
  236. setup_test_environment ();
  237. UNITY_BEGIN ();
  238. RUN_TEST (test_xpub_verbose_one_sub);
  239. RUN_TEST (test_xpub_verbose_two_subs);
  240. RUN_TEST (test_xpub_verboser_one_sub);
  241. RUN_TEST (test_xpub_verboser_two_subs);
  242. return UNITY_END ();
  243. }