poller.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. #include "testutil.hpp"
  2. #if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)
  3. #include <array>
  4. #include <memory>
  5. #ifdef ZMQ_CPP17
  6. static_assert(std::is_nothrow_swappable_v<zmq::poller_t<>>);
  7. #endif
  8. static_assert(sizeof(zmq_poller_event_t) == sizeof(zmq::poller_event<>), "");
  9. static_assert(sizeof(zmq_poller_event_t) == sizeof(zmq::poller_event<zmq::
  10. no_user_data>), "");
  11. static_assert(sizeof(zmq_poller_event_t) == sizeof(zmq::poller_event<int>), "");
  12. static_assert(alignof(zmq_poller_event_t) == alignof(zmq::poller_event<>), "");
  13. static_assert(alignof(zmq_poller_event_t) == alignof(zmq::poller_event<int>), "");
  14. static_assert(!std::is_copy_constructible<zmq::poller_t<>>::value,
  15. "poller_t should not be copy-constructible");
  16. static_assert(!std::is_copy_assignable<zmq::poller_t<>>::value,
  17. "poller_t should not be copy-assignable");
  18. TEST_CASE("event flags", "[poller]")
  19. {
  20. CHECK((zmq::event_flags::pollin | zmq::event_flags::pollout)
  21. == static_cast<zmq::event_flags>(ZMQ_POLLIN | ZMQ_POLLOUT));
  22. CHECK((zmq::event_flags::pollin & zmq::event_flags::pollout)
  23. == static_cast<zmq::event_flags>(ZMQ_POLLIN & ZMQ_POLLOUT));
  24. CHECK((zmq::event_flags::pollin ^ zmq::event_flags::pollout)
  25. == static_cast<zmq::event_flags>(ZMQ_POLLIN ^ ZMQ_POLLOUT));
  26. CHECK(~zmq::event_flags::pollin == static_cast<zmq::event_flags>(~ZMQ_POLLIN));
  27. }
  28. TEST_CASE("poller create destroy", "[poller]")
  29. {
  30. zmq::poller_t<> a;
  31. #ifdef ZMQ_CPP17 // CTAD
  32. zmq::poller_t b;
  33. zmq::poller_event e;
  34. #endif
  35. }
  36. TEST_CASE("poller move construct empty", "[poller]")
  37. {
  38. zmq::poller_t<> a;
  39. zmq::poller_t<> b = std::move(a);
  40. }
  41. TEST_CASE("poller move assign empty", "[poller]")
  42. {
  43. zmq::poller_t<> a;
  44. zmq::poller_t<> b;
  45. b = std::move(a);
  46. }
  47. TEST_CASE("poller swap", "[poller]")
  48. {
  49. zmq::poller_t<> a;
  50. zmq::poller_t<> b;
  51. using std::swap;
  52. swap(a, b);
  53. }
  54. TEST_CASE("poller move construct non empty", "[poller]")
  55. {
  56. zmq::context_t context;
  57. zmq::socket_t socket{context, zmq::socket_type::router};
  58. zmq::poller_t<> a;
  59. a.add(socket, zmq::event_flags::pollin);
  60. zmq::poller_t<> b = std::move(a);
  61. }
  62. TEST_CASE("poller move assign non empty", "[poller]")
  63. {
  64. zmq::context_t context;
  65. zmq::socket_t socket{context, zmq::socket_type::router};
  66. zmq::poller_t<> a;
  67. a.add(socket, zmq::event_flags::pollin);
  68. zmq::poller_t<> b;
  69. b = std::move(a);
  70. }
  71. TEST_CASE("poller add nullptr", "[poller]")
  72. {
  73. zmq::context_t context;
  74. zmq::socket_t socket{context, zmq::socket_type::router};
  75. zmq::poller_t<void> poller;
  76. CHECK_NOTHROW(poller.add(socket, zmq::event_flags::pollin, nullptr));
  77. }
  78. TEST_CASE("poller add non nullptr", "[poller]")
  79. {
  80. zmq::context_t context;
  81. zmq::socket_t socket{context, zmq::socket_type::router};
  82. zmq::poller_t<int> poller;
  83. int i;
  84. CHECK_NOTHROW(poller.add(socket, zmq::event_flags::pollin, &i));
  85. }
  86. #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 0)
  87. // this behaviour was added by https://github.com/zeromq/libzmq/pull/3100
  88. TEST_CASE("poller add handler invalid events type", "[poller]")
  89. {
  90. zmq::context_t context;
  91. zmq::socket_t socket{context, zmq::socket_type::router};
  92. zmq::poller_t<> poller;
  93. short invalid_events_type = 2 << 10;
  94. CHECK_THROWS_AS(
  95. poller.add(socket, static_cast<zmq::event_flags>(invalid_events_type)),
  96. const zmq::error_t&);
  97. }
  98. #endif
  99. TEST_CASE("poller add handler twice throws", "[poller]")
  100. {
  101. zmq::context_t context;
  102. zmq::socket_t socket{context, zmq::socket_type::router};
  103. zmq::poller_t<> poller;
  104. poller.add(socket, zmq::event_flags::pollin);
  105. /// \todo the actual error code should be checked
  106. CHECK_THROWS_AS(poller.add(socket, zmq::event_flags::pollin),
  107. const zmq::error_t&);
  108. }
  109. TEST_CASE("poller wait with no handlers throws", "[poller]")
  110. {
  111. zmq::poller_t<> poller;
  112. std::vector<zmq::poller_event<>> events;
  113. /// \todo the actual error code should be checked
  114. CHECK_THROWS_AS(poller.wait_all(events, std::chrono::milliseconds{10}),
  115. const zmq::error_t&);
  116. }
  117. TEST_CASE("poller remove unregistered throws", "[poller]")
  118. {
  119. zmq::context_t context;
  120. zmq::socket_t socket{context, zmq::socket_type::router};
  121. zmq::poller_t<> poller;
  122. /// \todo the actual error code should be checked
  123. CHECK_THROWS_AS(poller.remove(socket), const zmq::error_t&);
  124. }
  125. TEST_CASE("poller remove registered empty", "[poller]")
  126. {
  127. zmq::context_t context;
  128. zmq::socket_t socket{context, zmq::socket_type::router};
  129. zmq::poller_t<> poller;
  130. poller.add(socket, zmq::event_flags::pollin);
  131. CHECK_NOTHROW(poller.remove(socket));
  132. }
  133. TEST_CASE("poller remove registered non empty", "[poller]")
  134. {
  135. zmq::context_t context;
  136. zmq::socket_t socket{context, zmq::socket_type::router};
  137. zmq::poller_t<int> poller;
  138. int empty{};
  139. poller.add(socket, zmq::event_flags::pollin, &empty);
  140. CHECK_NOTHROW(poller.remove(socket));
  141. }
  142. const std::string hi_str = "Hi";
  143. TEST_CASE("poller poll basic", "[poller]")
  144. {
  145. common_server_client_setup s;
  146. CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
  147. zmq::poller_t<int> poller;
  148. std::vector<zmq::poller_event<int>> events{1};
  149. int i = 0;
  150. CHECK_NOTHROW(poller.add(s.server, zmq::event_flags::pollin, &i));
  151. CHECK(1 == poller.wait_all(events, std::chrono::milliseconds{-1}));
  152. CHECK(s.server == events[0].socket);
  153. CHECK(&i == events[0].user_data);
  154. }
  155. TEST_CASE("poller add invalid socket throws", "[poller]")
  156. {
  157. zmq::context_t context;
  158. zmq::poller_t<> poller;
  159. zmq::socket_t a{context, zmq::socket_type::router};
  160. zmq::socket_t b{std::move(a)};
  161. CHECK_THROWS_AS(poller.add(a, zmq::event_flags::pollin), const zmq::error_t&);
  162. }
  163. TEST_CASE("poller remove invalid socket throws", "[poller]")
  164. {
  165. zmq::context_t context;
  166. zmq::socket_t socket{context, zmq::socket_type::router};
  167. zmq::poller_t<> poller;
  168. CHECK_NOTHROW(poller.add(socket, zmq::event_flags::pollin));
  169. std::vector<zmq::socket_t> sockets;
  170. sockets.emplace_back(std::move(socket));
  171. CHECK_THROWS_AS(poller.remove(socket), const zmq::error_t&);
  172. CHECK_NOTHROW(poller.remove(sockets[0]));
  173. }
  174. TEST_CASE("poller modify empty throws", "[poller]")
  175. {
  176. zmq::context_t context;
  177. zmq::socket_t socket{context, zmq::socket_type::push};
  178. zmq::poller_t<> poller;
  179. CHECK_THROWS_AS(poller.modify(socket, zmq::event_flags::pollin),
  180. const zmq::error_t&);
  181. }
  182. TEST_CASE("poller modify invalid socket throws", "[poller]")
  183. {
  184. zmq::context_t context;
  185. zmq::socket_t a{context, zmq::socket_type::push};
  186. zmq::socket_t b{std::move(a)};
  187. zmq::poller_t<> poller;
  188. CHECK_THROWS_AS(poller.modify(a, zmq::event_flags::pollin), const zmq::error_t&);
  189. }
  190. TEST_CASE("poller modify not added throws", "[poller]")
  191. {
  192. zmq::context_t context;
  193. zmq::socket_t a{context, zmq::socket_type::push};
  194. zmq::socket_t b{context, zmq::socket_type::push};
  195. zmq::poller_t<> poller;
  196. CHECK_NOTHROW(poller.add(a, zmq::event_flags::pollin));
  197. CHECK_THROWS_AS(poller.modify(b, zmq::event_flags::pollin), const zmq::error_t&);
  198. }
  199. TEST_CASE("poller modify simple", "[poller]")
  200. {
  201. zmq::context_t context;
  202. zmq::socket_t a{context, zmq::socket_type::push};
  203. zmq::poller_t<> poller;
  204. CHECK_NOTHROW(poller.add(a, zmq::event_flags::pollin));
  205. CHECK_NOTHROW(
  206. poller.modify(a, zmq::event_flags::pollin | zmq::event_flags::pollout));
  207. }
  208. TEST_CASE("poller poll client server", "[poller]")
  209. {
  210. // Setup server and client
  211. common_server_client_setup s;
  212. // Setup poller
  213. zmq::poller_t<zmq::socket_t> poller;
  214. CHECK_NOTHROW(poller.add(s.server, zmq::event_flags::pollin, &s.server));
  215. // client sends message
  216. CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
  217. // wait for message and verify events
  218. std::vector<zmq::poller_event<zmq::socket_t>> events(1);
  219. CHECK(1 == poller.wait_all(events, std::chrono::milliseconds{500}));
  220. CHECK(zmq::event_flags::pollin == events[0].events);
  221. // Modify server socket with pollout flag
  222. CHECK_NOTHROW(
  223. poller.modify(s.server, zmq::event_flags::pollin | zmq::event_flags::pollout
  224. ));
  225. CHECK(1 == poller.wait_all(events, std::chrono::milliseconds{500}));
  226. CHECK((zmq::event_flags::pollin | zmq::event_flags::pollout) == events[0].events)
  227. ;
  228. }
  229. TEST_CASE("poller wait one return", "[poller]")
  230. {
  231. // Setup server and client
  232. common_server_client_setup s;
  233. // Setup poller
  234. zmq::poller_t<> poller;
  235. CHECK_NOTHROW(poller.add(s.server, zmq::event_flags::pollin));
  236. // client sends message
  237. CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
  238. // wait for message and verify events
  239. std::vector<zmq::poller_event<>> events(1);
  240. CHECK(1 == poller.wait_all(events, std::chrono::milliseconds{500}));
  241. }
  242. TEST_CASE("poller wait on move constructed", "[poller]")
  243. {
  244. common_server_client_setup s;
  245. CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
  246. zmq::poller_t<> a;
  247. CHECK_NOTHROW(a.add(s.server, zmq::event_flags::pollin));
  248. zmq::poller_t<> b{std::move(a)};
  249. std::vector<zmq::poller_event<>> events(1);
  250. /// \todo the actual error code should be checked
  251. CHECK_THROWS_AS(a.wait_all(events, std::chrono::milliseconds{10}),
  252. const zmq::error_t&);
  253. CHECK(1 == b.wait_all(events, std::chrono::milliseconds{-1}));
  254. }
  255. TEST_CASE("poller wait on move assigned", "[poller]")
  256. {
  257. common_server_client_setup s;
  258. CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
  259. zmq::poller_t<> a;
  260. CHECK_NOTHROW(a.add(s.server, zmq::event_flags::pollin));
  261. zmq::poller_t<> b;
  262. b = {std::move(a)};
  263. /// \todo the TEST_CASE error code should be checked
  264. std::vector<zmq::poller_event<>> events(1);
  265. CHECK_THROWS_AS(a.wait_all(events, std::chrono::milliseconds{10}),
  266. const zmq::error_t&);
  267. CHECK(1 == b.wait_all(events, std::chrono::milliseconds{-1}));
  268. }
  269. TEST_CASE("poller remove from handler", "[poller]")
  270. {
  271. constexpr size_t ITER_NO = 10;
  272. // Setup servers and clients
  273. std::vector<common_server_client_setup> setup_list;
  274. for (size_t i = 0; i < ITER_NO; ++i)
  275. setup_list.emplace_back(common_server_client_setup{});
  276. // Setup poller
  277. zmq::poller_t<> poller;
  278. for (size_t i = 0; i < ITER_NO; ++i) {
  279. CHECK_NOTHROW(poller.add(setup_list[i].server, zmq::event_flags::pollin));
  280. }
  281. // Clients send messages
  282. for (auto &s : setup_list) {
  283. CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
  284. }
  285. // Wait for all servers to receive a message
  286. for (auto &s : setup_list) {
  287. zmq::pollitem_t items[] = {{s.server, 0, ZMQ_POLLIN, 0}};
  288. zmq::poll(&items[0], 1);
  289. }
  290. // Fire all handlers in one wait
  291. std::vector<zmq::poller_event<>> events(ITER_NO);
  292. CHECK(ITER_NO == poller.wait_all(events, std::chrono::milliseconds{-1}));
  293. }
  294. #endif