unittest_poller.cpp 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. /*
  2. Copyright (c) 2018 Contributors as noted in the AUTHORS file
  3. This file is part of 0MQ.
  4. 0MQ is free software; you can redistribute it and/or modify it under
  5. the terms of the GNU Lesser General Public License as published by
  6. the Free Software Foundation; either version 3 of the License, or
  7. (at your option) any later version.
  8. 0MQ is distributed in the hope that it will be useful,
  9. but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. GNU Lesser General Public License for more details.
  12. You should have received a copy of the GNU Lesser General Public License
  13. along with this program. If not, see <http://www.gnu.org/licenses/>.
  14. */
  15. #include "../tests/testutil.hpp"
  16. #include <poller.hpp>
  17. #include <i_poll_events.hpp>
  18. #include <ip.hpp>
  19. #include <unity.h>
  20. #ifndef _WIN32
  21. #include <unistd.h>
  22. #define closesocket close
  23. #endif
  24. void setUp ()
  25. {
  26. }
  27. void tearDown ()
  28. {
  29. }
  30. void test_create ()
  31. {
  32. zmq::thread_ctx_t thread_ctx;
  33. zmq::poller_t poller (thread_ctx);
  34. }
  35. #if 0
  36. // TODO this triggers an assertion. should it be a valid use case?
  37. void test_start_empty ()
  38. {
  39. zmq::thread_ctx_t thread_ctx;
  40. zmq::poller_t poller (thread_ctx);
  41. poller.start ();
  42. msleep (SETTLE_TIME);
  43. }
  44. #endif
  45. struct test_events_t : zmq::i_poll_events
  46. {
  47. test_events_t (zmq::fd_t fd_, zmq::poller_t &poller_) :
  48. _fd (fd_),
  49. _poller (poller_)
  50. {
  51. (void) _fd;
  52. }
  53. void in_event () ZMQ_OVERRIDE
  54. {
  55. _poller.rm_fd (_handle);
  56. _handle = (zmq::poller_t::handle_t) NULL;
  57. // this must only be incremented after rm_fd
  58. in_events.add (1);
  59. }
  60. void out_event () ZMQ_OVERRIDE
  61. {
  62. // TODO
  63. }
  64. void timer_event (int id_) ZMQ_OVERRIDE
  65. {
  66. LIBZMQ_UNUSED (id_);
  67. _poller.rm_fd (_handle);
  68. _handle = (zmq::poller_t::handle_t) NULL;
  69. // this must only be incremented after rm_fd
  70. timer_events.add (1);
  71. }
  72. void set_handle (zmq::poller_t::handle_t handle_) { _handle = handle_; }
  73. zmq::atomic_counter_t in_events, timer_events;
  74. private:
  75. zmq::fd_t _fd;
  76. zmq::poller_t &_poller;
  77. zmq::poller_t::handle_t _handle;
  78. };
  79. void wait_in_events (test_events_t &events_)
  80. {
  81. void *watch = zmq_stopwatch_start ();
  82. while (events_.in_events.get () < 1) {
  83. msleep (1);
  84. #ifdef ZMQ_BUILD_DRAFT
  85. TEST_ASSERT_LESS_OR_EQUAL_MESSAGE (SETTLE_TIME,
  86. zmq_stopwatch_intermediate (watch),
  87. "Timeout waiting for in event");
  88. #endif
  89. }
  90. zmq_stopwatch_stop (watch);
  91. }
  92. void wait_timer_events (test_events_t &events_)
  93. {
  94. void *watch = zmq_stopwatch_start ();
  95. while (events_.timer_events.get () < 1) {
  96. msleep (1);
  97. #ifdef ZMQ_BUILD_DRAFT
  98. TEST_ASSERT_LESS_OR_EQUAL_MESSAGE (SETTLE_TIME,
  99. zmq_stopwatch_intermediate (watch),
  100. "Timeout waiting for timer event");
  101. #endif
  102. }
  103. zmq_stopwatch_stop (watch);
  104. }
  105. void create_nonblocking_fdpair (zmq::fd_t *r_, zmq::fd_t *w_)
  106. {
  107. int rc = zmq::make_fdpair (r_, w_);
  108. TEST_ASSERT_EQUAL_INT (0, rc);
  109. TEST_ASSERT_NOT_EQUAL (zmq::retired_fd, *r_);
  110. TEST_ASSERT_NOT_EQUAL (zmq::retired_fd, *w_);
  111. zmq::unblock_socket (*r_);
  112. zmq::unblock_socket (*w_);
  113. }
  114. void send_signal (zmq::fd_t w_)
  115. {
  116. #if defined ZMQ_HAVE_EVENTFD
  117. const uint64_t inc = 1;
  118. ssize_t sz = write (w_, &inc, sizeof (inc));
  119. assert (sz == sizeof (inc));
  120. #else
  121. {
  122. char msg[] = "test";
  123. int rc = send (w_, msg, sizeof (msg), 0);
  124. assert (rc == sizeof (msg));
  125. }
  126. #endif
  127. }
  128. void close_fdpair (zmq::fd_t w_, zmq::fd_t r_)
  129. {
  130. int rc = closesocket (w_);
  131. TEST_ASSERT_EQUAL_INT (0, rc);
  132. #if !defined ZMQ_HAVE_EVENTFD
  133. rc = closesocket (r_);
  134. TEST_ASSERT_EQUAL_INT (0, rc);
  135. #else
  136. LIBZMQ_UNUSED (r_);
  137. #endif
  138. }
  139. void test_add_fd_and_start_and_receive_data ()
  140. {
  141. zmq::thread_ctx_t thread_ctx;
  142. zmq::poller_t poller (thread_ctx);
  143. zmq::fd_t r, w;
  144. create_nonblocking_fdpair (&r, &w);
  145. test_events_t events (r, poller);
  146. zmq::poller_t::handle_t handle = poller.add_fd (r, &events);
  147. events.set_handle (handle);
  148. poller.set_pollin (handle);
  149. poller.start ();
  150. send_signal (w);
  151. wait_in_events (events);
  152. // required cleanup
  153. close_fdpair (w, r);
  154. }
  155. void test_add_fd_and_remove_by_timer ()
  156. {
  157. zmq::fd_t r, w;
  158. create_nonblocking_fdpair (&r, &w);
  159. zmq::thread_ctx_t thread_ctx;
  160. zmq::poller_t poller (thread_ctx);
  161. test_events_t events (r, poller);
  162. zmq::poller_t::handle_t handle = poller.add_fd (r, &events);
  163. events.set_handle (handle);
  164. poller.add_timer (50, &events, 0);
  165. poller.start ();
  166. wait_timer_events (events);
  167. // required cleanup
  168. close_fdpair (w, r);
  169. }
  170. #ifdef _WIN32
  171. void test_add_fd_with_pending_failing_connect ()
  172. {
  173. zmq::thread_ctx_t thread_ctx;
  174. zmq::poller_t poller (thread_ctx);
  175. zmq::fd_t bind_socket = socket (AF_INET, SOCK_STREAM, 0);
  176. sockaddr_in addr = {0};
  177. addr.sin_family = AF_INET;
  178. addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
  179. addr.sin_port = 0;
  180. TEST_ASSERT_EQUAL_INT (0, bind (bind_socket,
  181. reinterpret_cast<const sockaddr *> (&addr),
  182. sizeof (addr)));
  183. int addr_len = static_cast<int> (sizeof (addr));
  184. TEST_ASSERT_EQUAL_INT (0, getsockname (bind_socket,
  185. reinterpret_cast<sockaddr *> (&addr),
  186. &addr_len));
  187. zmq::fd_t connect_socket = socket (AF_INET, SOCK_STREAM, 0);
  188. zmq::unblock_socket (connect_socket);
  189. TEST_ASSERT_EQUAL_INT (
  190. -1, connect (connect_socket, reinterpret_cast<const sockaddr *> (&addr),
  191. sizeof (addr)));
  192. TEST_ASSERT_EQUAL_INT (WSAEWOULDBLOCK, WSAGetLastError ());
  193. test_events_t events (connect_socket, poller);
  194. zmq::poller_t::handle_t handle = poller.add_fd (connect_socket, &events);
  195. events.set_handle (handle);
  196. poller.set_pollin (handle);
  197. poller.start ();
  198. wait_in_events (events);
  199. int value;
  200. int value_len = sizeof (value);
  201. TEST_ASSERT_EQUAL_INT (0, getsockopt (connect_socket, SOL_SOCKET, SO_ERROR,
  202. reinterpret_cast<char *> (&value),
  203. &value_len));
  204. TEST_ASSERT_EQUAL_INT (WSAECONNREFUSED, value);
  205. // required cleanup
  206. close (connect_socket);
  207. close (bind_socket);
  208. }
  209. #endif
  210. int main (void)
  211. {
  212. UNITY_BEGIN ();
  213. zmq::initialize_network ();
  214. setup_test_environment ();
  215. RUN_TEST (test_create);
  216. RUN_TEST (test_add_fd_and_start_and_receive_data);
  217. RUN_TEST (test_add_fd_and_remove_by_timer);
  218. #if defined _WIN32
  219. RUN_TEST (test_add_fd_with_pending_failing_connect);
  220. #endif
  221. zmq::shutdown_network ();
  222. return UNITY_END ();
  223. }