test_immediate.cpp 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. /*
  2. Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
  3. This file is part of libzmq, the ZeroMQ core engine in C++.
  4. libzmq is free software; you can redistribute it and/or modify it under
  5. the terms of the GNU Lesser General Public License (LGPL) as published
  6. by the Free Software Foundation; either version 3 of the License, or
  7. (at your option) any later version.
  8. As a special exception, the Contributors give you permission to link
  9. this library with independent modules to produce an executable,
  10. regardless of the license terms of these independent modules, and to
  11. copy and distribute the resulting executable under terms of your choice,
  12. provided that you also meet, for each linked independent module, the
  13. terms and conditions of the license of that module. An independent
  14. module is a module which is not derived from or based on this library.
  15. If you modify this library, you must extend this exception to your
  16. version of the library.
  17. libzmq is distributed in the hope that it will be useful, but WITHOUT
  18. ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  19. FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
  20. License for more details.
  21. You should have received a copy of the GNU Lesser General Public License
  22. along with this program. If not, see <http://www.gnu.org/licenses/>.
  23. */
  24. #include "testutil.hpp"
  25. #include "testutil_unity.hpp"
  26. SETUP_TEARDOWN_TESTCONTEXT
  27. void test_immediate_1 ()
  28. {
  29. int val;
  30. int rc;
  31. char buffer[16];
  32. size_t len = MAX_SOCKET_STRING;
  33. char my_endpoint[MAX_SOCKET_STRING];
  34. // TEST 1.
  35. // First we're going to attempt to send messages to two
  36. // pipes, one connected, the other not. We should see
  37. // the PUSH load balancing to both pipes, and hence half
  38. // of the messages getting queued, as connect() creates a
  39. // pipe immediately.
  40. void *to = test_context_socket (ZMQ_PULL);
  41. // Bind the one valid receiver
  42. val = 0;
  43. TEST_ASSERT_SUCCESS_ERRNO (
  44. zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof (val)));
  45. bind_loopback_ipv4 (to, my_endpoint, len);
  46. // Create a socket pushing to two endpoints - only 1 message should arrive.
  47. void *from = test_context_socket (ZMQ_PUSH);
  48. val = 0;
  49. TEST_ASSERT_SUCCESS_ERRNO (
  50. zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof (val)));
  51. // This pipe will not connect (provided the ephemeral port is not 5556)
  52. TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (from, "tcp://localhost:5556"));
  53. // This pipe will
  54. TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (from, my_endpoint));
  55. msleep (SETTLE_TIME);
  56. // We send 10 messages, 5 should just get stuck in the queue
  57. // for the not-yet-connected pipe
  58. for (int i = 0; i < 10; ++i) {
  59. send_string_expect_success (from, "Hello", 0);
  60. }
  61. // We now consume from the connected pipe
  62. // - we should see just 5
  63. int timeout = 250;
  64. TEST_ASSERT_SUCCESS_ERRNO (
  65. zmq_setsockopt (to, ZMQ_RCVTIMEO, &timeout, sizeof (int)));
  66. int seen = 0;
  67. while (true) {
  68. rc = zmq_recv (to, &buffer, sizeof (buffer), 0);
  69. if (rc == -1)
  70. break; // Break when we didn't get a message
  71. seen++;
  72. }
  73. TEST_ASSERT_EQUAL_INT (5, seen);
  74. test_context_socket_close (from);
  75. test_context_socket_close (to);
  76. }
  77. void test_immediate_2 ()
  78. {
  79. // This time we will do the same thing, connect two pipes,
  80. // one of which will succeed in connecting to a bound
  81. // receiver, the other of which will fail. However, we will
  82. // also set the delay attach on connect flag, which should
  83. // cause the pipe attachment to be delayed until the connection
  84. // succeeds.
  85. // Bind the valid socket
  86. void *to = test_context_socket (ZMQ_PULL);
  87. size_t len = MAX_SOCKET_STRING;
  88. char my_endpoint[MAX_SOCKET_STRING];
  89. bind_loopback_ipv4 (to, my_endpoint, len);
  90. int val = 0;
  91. TEST_ASSERT_SUCCESS_ERRNO (
  92. zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof (val)));
  93. // Create a socket pushing to two endpoints - all messages should arrive.
  94. void *from = test_context_socket (ZMQ_PUSH);
  95. val = 0;
  96. TEST_ASSERT_SUCCESS_ERRNO (
  97. zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof (val)));
  98. // Set the key flag
  99. val = 1;
  100. TEST_ASSERT_SUCCESS_ERRNO (
  101. zmq_setsockopt (from, ZMQ_IMMEDIATE, &val, sizeof (val)));
  102. // Connect to the invalid socket
  103. TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (from, "tcp://localhost:5561"));
  104. // Connect to the valid socket
  105. TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (from, my_endpoint));
  106. // Send 10 messages, all should be routed to the connected pipe
  107. for (int i = 0; i < 10; ++i) {
  108. send_string_expect_success (from, "Hello", 0);
  109. }
  110. int timeout = 250;
  111. TEST_ASSERT_SUCCESS_ERRNO (
  112. zmq_setsockopt (to, ZMQ_RCVTIMEO, &timeout, sizeof (int)));
  113. int seen = 0;
  114. while (true) {
  115. char buffer[16];
  116. int rc = zmq_recv (to, &buffer, sizeof (buffer), 0);
  117. if (rc == -1)
  118. break; // Break when we didn't get a message
  119. seen++;
  120. }
  121. TEST_ASSERT_EQUAL_INT (10, seen);
  122. test_context_socket_close (from);
  123. test_context_socket_close (to);
  124. }
  125. void test_immediate_3 ()
  126. {
  127. // This time we want to validate that the same blocking behaviour
  128. // occurs with an existing connection that is broken. We will send
  129. // messages to a connected pipe, disconnect and verify the messages
  130. // block. Then we reconnect and verify messages flow again.
  131. void *backend = test_context_socket (ZMQ_DEALER);
  132. void *frontend = test_context_socket (ZMQ_DEALER);
  133. int zero = 0;
  134. TEST_ASSERT_SUCCESS_ERRNO (
  135. zmq_setsockopt (backend, ZMQ_LINGER, &zero, sizeof (zero)));
  136. TEST_ASSERT_SUCCESS_ERRNO (
  137. zmq_setsockopt (frontend, ZMQ_LINGER, &zero, sizeof (zero)));
  138. // Frontend connects to backend using IMMEDIATE
  139. int on = 1;
  140. TEST_ASSERT_SUCCESS_ERRNO (
  141. zmq_setsockopt (frontend, ZMQ_IMMEDIATE, &on, sizeof (on)));
  142. size_t len = MAX_SOCKET_STRING;
  143. char my_endpoint[MAX_SOCKET_STRING];
  144. bind_loopback_ipv4 (backend, my_endpoint, len);
  145. TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (frontend, my_endpoint));
  146. // Ping backend to frontend so we know when the connection is up
  147. send_string_expect_success (backend, "Hello", 0);
  148. recv_string_expect_success (frontend, "Hello", 0);
  149. // Send message from frontend to backend
  150. send_string_expect_success (frontend, "Hello", ZMQ_DONTWAIT);
  151. test_context_socket_close (backend);
  152. // Give time to process disconnect
  153. msleep (SETTLE_TIME * 10);
  154. // Send a message, should fail
  155. TEST_ASSERT_FAILURE_ERRNO (EAGAIN,
  156. zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT));
  157. // Recreate backend socket
  158. backend = test_context_socket (ZMQ_DEALER);
  159. TEST_ASSERT_SUCCESS_ERRNO (
  160. zmq_setsockopt (backend, ZMQ_LINGER, &zero, sizeof (zero)));
  161. TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (backend, my_endpoint));
  162. // Ping backend to frontend so we know when the connection is up
  163. send_string_expect_success (backend, "Hello", 0);
  164. recv_string_expect_success (frontend, "Hello", 0);
  165. // After the reconnect, should succeed
  166. send_string_expect_success (frontend, "Hello", ZMQ_DONTWAIT);
  167. test_context_socket_close (backend);
  168. test_context_socket_close (frontend);
  169. }
  170. int main (void)
  171. {
  172. setup_test_environment ();
  173. UNITY_BEGIN ();
  174. RUN_TEST (test_immediate_1);
  175. RUN_TEST (test_immediate_2);
  176. RUN_TEST (test_immediate_3);
  177. return UNITY_END ();
  178. }