test_inproc_connect.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. /*
  2. Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
  3. This file is part of libzmq, the ZeroMQ core engine in C++.
  4. libzmq is free software; you can redistribute it and/or modify it under
  5. the terms of the GNU Lesser General Public License (LGPL) as published
  6. by the Free Software Foundation; either version 3 of the License, or
  7. (at your option) any later version.
  8. As a special exception, the Contributors give you permission to link
  9. this library with independent modules to produce an executable,
  10. regardless of the license terms of these independent modules, and to
  11. copy and distribute the resulting executable under terms of your choice,
  12. provided that you also meet, for each linked independent module, the
  13. terms and conditions of the license of that module. An independent
  14. module is a module which is not derived from or based on this library.
  15. If you modify this library, you must extend this exception to your
  16. version of the library.
  17. libzmq is distributed in the hope that it will be useful, but WITHOUT
  18. ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  19. FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
  20. License for more details.
  21. You should have received a copy of the GNU Lesser General Public License
  22. along with this program. If not, see <http://www.gnu.org/licenses/>.
  23. */
  24. #include "testutil.hpp"
  25. #include "testutil_unity.hpp"
  26. SETUP_TEARDOWN_TESTCONTEXT
  27. static void pusher (void * /*unused*/)
  28. {
  29. // Connect first
  30. // do not use test_context_socket here, as it is not thread-safe
  31. void *connect_socket = zmq_socket (get_test_context (), ZMQ_PAIR);
  32. TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://sink"));
  33. // Queue up some data
  34. send_string_expect_success (connect_socket, "foobar", 0);
  35. // Cleanup
  36. TEST_ASSERT_SUCCESS_ERRNO (zmq_close (connect_socket));
  37. }
  38. static void simult_conn (void *endpt_)
  39. {
  40. // Pull out arguments - endpoint string
  41. const char *endpt = static_cast<const char *> (endpt_);
  42. // Connect
  43. // do not use test_context_socket here, as it is not thread-safe
  44. void *connect_socket = zmq_socket (get_test_context (), ZMQ_SUB);
  45. TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, endpt));
  46. // Cleanup
  47. TEST_ASSERT_SUCCESS_ERRNO (zmq_close (connect_socket));
  48. }
  49. static void simult_bind (void *endpt_)
  50. {
  51. // Pull out arguments - context followed by endpoint string
  52. const char *endpt = static_cast<const char *> (endpt_);
  53. // Bind
  54. // do not use test_context_socket here, as it is not thread-safe
  55. void *bind_socket = zmq_socket (get_test_context (), ZMQ_PUB);
  56. TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, endpt));
  57. // Cleanup
  58. TEST_ASSERT_SUCCESS_ERRNO (zmq_close (bind_socket));
  59. }
  60. void test_bind_before_connect ()
  61. {
  62. // Bind first
  63. void *bind_socket = test_context_socket (ZMQ_PAIR);
  64. TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, "inproc://bbc"));
  65. // Now connect
  66. void *connect_socket = test_context_socket (ZMQ_PAIR);
  67. TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://bbc"));
  68. // Queue up some data
  69. send_string_expect_success (connect_socket, "foobar", 0);
  70. // Read pending message
  71. recv_string_expect_success (bind_socket, "foobar", 0);
  72. // Cleanup
  73. test_context_socket_close (connect_socket);
  74. test_context_socket_close (bind_socket);
  75. }
  76. void test_connect_before_bind ()
  77. {
  78. // Connect first
  79. void *connect_socket = test_context_socket (ZMQ_PAIR);
  80. TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://cbb"));
  81. // Queue up some data
  82. send_string_expect_success (connect_socket, "foobar", 0);
  83. // Now bind
  84. void *bind_socket = test_context_socket (ZMQ_PAIR);
  85. TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, "inproc://cbb"));
  86. // Read pending message
  87. recv_string_expect_success (bind_socket, "foobar", 0);
  88. // Cleanup
  89. test_context_socket_close (connect_socket);
  90. test_context_socket_close (bind_socket);
  91. }
  92. void test_connect_before_bind_pub_sub ()
  93. {
  94. // Connect first
  95. void *connect_socket = test_context_socket (ZMQ_PUB);
  96. TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://cbbps"));
  97. // Queue up some data, this will be dropped
  98. send_string_expect_success (connect_socket, "before", 0);
  99. // Now bind
  100. void *bind_socket = test_context_socket (ZMQ_SUB);
  101. TEST_ASSERT_SUCCESS_ERRNO (
  102. zmq_setsockopt (bind_socket, ZMQ_SUBSCRIBE, "", 0));
  103. TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, "inproc://cbbps"));
  104. // Wait for pub-sub connection to happen
  105. msleep (SETTLE_TIME);
  106. // Queue up some data, this not will be dropped
  107. send_string_expect_success (connect_socket, "after", 0);
  108. // Read pending message
  109. recv_string_expect_success (bind_socket, "after", 0);
  110. // Cleanup
  111. test_context_socket_close (connect_socket);
  112. test_context_socket_close (bind_socket);
  113. }
  114. void test_connect_before_bind_ctx_term ()
  115. {
  116. for (int i = 0; i < 20; ++i) {
  117. // Connect first
  118. void *connect_socket = test_context_socket (ZMQ_ROUTER);
  119. char ep[32];
  120. sprintf (ep, "inproc://cbbrr%d", i);
  121. TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, ep));
  122. // Cleanup
  123. test_context_socket_close (connect_socket);
  124. }
  125. }
  126. void test_multiple_connects ()
  127. {
  128. const unsigned int no_of_connects = 10;
  129. void *connect_socket[no_of_connects];
  130. // Connect first
  131. for (unsigned int i = 0; i < no_of_connects; ++i) {
  132. connect_socket[i] = test_context_socket (ZMQ_PUSH);
  133. TEST_ASSERT_SUCCESS_ERRNO (
  134. zmq_connect (connect_socket[i], "inproc://multiple"));
  135. // Queue up some data
  136. send_string_expect_success (connect_socket[i], "foobar", 0);
  137. }
  138. // Now bind
  139. void *bind_socket = test_context_socket (ZMQ_PULL);
  140. TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, "inproc://multiple"));
  141. for (unsigned int i = 0; i < no_of_connects; ++i) {
  142. recv_string_expect_success (bind_socket, "foobar", 0);
  143. }
  144. // Cleanup
  145. for (unsigned int i = 0; i < no_of_connects; ++i) {
  146. test_context_socket_close (connect_socket[i]);
  147. }
  148. test_context_socket_close (bind_socket);
  149. }
  150. void test_multiple_threads ()
  151. {
  152. const unsigned int no_of_threads = 30;
  153. void *threads[no_of_threads];
  154. // Connect first
  155. for (unsigned int i = 0; i < no_of_threads; ++i) {
  156. threads[i] = zmq_threadstart (&pusher, NULL);
  157. }
  158. // Now bind
  159. void *bind_socket = test_context_socket (ZMQ_PULL);
  160. TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, "inproc://sink"));
  161. for (unsigned int i = 0; i < no_of_threads; ++i) {
  162. // Read pending message
  163. recv_string_expect_success (bind_socket, "foobar", 0);
  164. }
  165. // Cleanup
  166. for (unsigned int i = 0; i < no_of_threads; ++i) {
  167. zmq_threadclose (threads[i]);
  168. }
  169. test_context_socket_close (bind_socket);
  170. }
  171. void test_simultaneous_connect_bind_threads ()
  172. {
  173. const unsigned int no_of_times = 50;
  174. void *threads[no_of_times * 2];
  175. void *thr_args[no_of_times];
  176. char endpts[no_of_times][20];
  177. // Set up thread arguments: context followed by endpoint string
  178. for (unsigned int i = 0; i < no_of_times; ++i) {
  179. thr_args[i] = (void *) endpts[i];
  180. sprintf (endpts[i], "inproc://foo_%d", i);
  181. }
  182. // Spawn all threads as simultaneously as possible
  183. for (unsigned int i = 0; i < no_of_times; ++i) {
  184. threads[i * 2 + 0] = zmq_threadstart (&simult_conn, thr_args[i]);
  185. threads[i * 2 + 1] = zmq_threadstart (&simult_bind, thr_args[i]);
  186. }
  187. // Close all threads
  188. for (unsigned int i = 0; i < no_of_times; ++i) {
  189. zmq_threadclose (threads[i * 2 + 0]);
  190. zmq_threadclose (threads[i * 2 + 1]);
  191. }
  192. }
  193. void test_routing_id ()
  194. {
  195. // Create the infrastructure
  196. void *sc = test_context_socket (ZMQ_DEALER);
  197. TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, "inproc://routing_id"));
  198. void *sb = test_context_socket (ZMQ_ROUTER);
  199. TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, "inproc://routing_id"));
  200. // Send 2-part message.
  201. TEST_ASSERT_EQUAL_INT (
  202. 1, TEST_ASSERT_SUCCESS_ERRNO (zmq_send (sc, "A", 1, ZMQ_SNDMORE)));
  203. TEST_ASSERT_EQUAL_INT (
  204. 1, TEST_ASSERT_SUCCESS_ERRNO (zmq_send (sc, "B", 1, 0)));
  205. // Routing id comes first.
  206. zmq_msg_t msg;
  207. TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));
  208. TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, sb, 0));
  209. TEST_ASSERT_EQUAL_INT (1, zmq_msg_more (&msg));
  210. // Then the first part of the message body.
  211. TEST_ASSERT_EQUAL_INT (
  212. 1, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, sb, 0)));
  213. TEST_ASSERT_EQUAL_INT (1, zmq_msg_more (&msg));
  214. // And finally, the second part of the message body.
  215. TEST_ASSERT_EQUAL_INT (
  216. 1, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, sb, 0)));
  217. TEST_ASSERT_EQUAL_INT (0, zmq_msg_more (&msg));
  218. // Deallocate the infrastructure.
  219. test_context_socket_close (sc);
  220. test_context_socket_close (sb);
  221. }
  222. void test_connect_only ()
  223. {
  224. void *connect_socket = test_context_socket (ZMQ_PUSH);
  225. TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://a"));
  226. test_context_socket_close (connect_socket);
  227. }
  228. void test_unbind ()
  229. {
  230. // Bind and unbind socket 1
  231. void *bind_socket1 = test_context_socket (ZMQ_PAIR);
  232. TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket1, "inproc://unbind"));
  233. TEST_ASSERT_SUCCESS_ERRNO (zmq_unbind (bind_socket1, "inproc://unbind"));
  234. // Bind socket 2
  235. void *bind_socket2 = test_context_socket (ZMQ_PAIR);
  236. TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket2, "inproc://unbind"));
  237. // Now connect
  238. void *connect_socket = test_context_socket (ZMQ_PAIR);
  239. TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://unbind"));
  240. // Queue up some data
  241. send_string_expect_success (connect_socket, "foobar", 0);
  242. // Read pending message
  243. recv_string_expect_success (bind_socket2, "foobar", 0);
  244. // Cleanup
  245. test_context_socket_close (connect_socket);
  246. test_context_socket_close (bind_socket1);
  247. test_context_socket_close (bind_socket2);
  248. }
  249. void test_shutdown_during_pend ()
  250. {
  251. // Connect first
  252. void *connect_socket = test_context_socket (ZMQ_PAIR);
  253. TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://cbb"));
  254. zmq_ctx_shutdown (get_test_context ());
  255. // Cleanup
  256. test_context_socket_close (connect_socket);
  257. }
  258. int main (void)
  259. {
  260. setup_test_environment ();
  261. UNITY_BEGIN ();
  262. RUN_TEST (test_bind_before_connect);
  263. RUN_TEST (test_connect_before_bind);
  264. RUN_TEST (test_connect_before_bind_pub_sub);
  265. RUN_TEST (test_connect_before_bind_ctx_term);
  266. RUN_TEST (test_multiple_connects);
  267. RUN_TEST (test_multiple_threads);
  268. RUN_TEST (test_simultaneous_connect_bind_threads);
  269. RUN_TEST (test_routing_id);
  270. RUN_TEST (test_connect_only);
  271. RUN_TEST (test_unbind);
  272. RUN_TEST (test_shutdown_during_pend);
  273. return UNITY_END ();
  274. }