active_poller.cpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454
  1. #include <zmq_addon.hpp>
  2. #include "testutil.hpp"
  3. #if defined(ZMQ_CPP11) && defined(ZMQ_BUILD_DRAFT_API)
  4. #include <array>
  5. #include <memory>
  6. TEST_CASE("create destroy", "[active_poller]")
  7. {
  8. zmq::active_poller_t active_poller;
  9. CHECK(active_poller.empty());
  10. }
  11. static_assert(!std::is_copy_constructible<zmq::active_poller_t>::value,
  12. "active_active_poller_t should not be copy-constructible");
  13. static_assert(!std::is_copy_assignable<zmq::active_poller_t>::value,
  14. "active_active_poller_t should not be copy-assignable");
  15. TEST_CASE("move construct empty", "[active_poller]")
  16. {
  17. zmq::active_poller_t a;
  18. CHECK(a.empty());
  19. zmq::active_poller_t b = std::move(a);
  20. CHECK(b.empty());
  21. CHECK(0u == a.size());
  22. CHECK(0u == b.size());
  23. }
  24. TEST_CASE("move assign empty", "[active_poller]")
  25. {
  26. zmq::active_poller_t a;
  27. CHECK(a.empty());
  28. zmq::active_poller_t b;
  29. CHECK(b.empty());
  30. b = std::move(a);
  31. CHECK(0u == a.size());
  32. CHECK(0u == b.size());
  33. CHECK(a.empty());
  34. CHECK(b.empty());
  35. }
  36. TEST_CASE("move construct non empty", "[active_poller]")
  37. {
  38. zmq::context_t context;
  39. zmq::socket_t socket{context, zmq::socket_type::router};
  40. zmq::active_poller_t a;
  41. a.add(socket, zmq::event_flags::pollin, [](zmq::event_flags)
  42. {
  43. });
  44. CHECK_FALSE(a.empty());
  45. CHECK(1u == a.size());
  46. zmq::active_poller_t b = std::move(a);
  47. CHECK(a.empty());
  48. CHECK(0u == a.size());
  49. CHECK_FALSE(b.empty());
  50. CHECK(1u == b.size());
  51. }
  52. TEST_CASE("move assign non empty", "[active_poller]")
  53. {
  54. zmq::context_t context;
  55. zmq::socket_t socket{context, zmq::socket_type::router};
  56. zmq::active_poller_t a;
  57. a.add(socket, zmq::event_flags::pollin, [](zmq::event_flags)
  58. {
  59. });
  60. CHECK_FALSE(a.empty());
  61. CHECK(1u == a.size());
  62. zmq::active_poller_t b;
  63. b = std::move(a);
  64. CHECK(a.empty());
  65. CHECK(0u == a.size());
  66. CHECK_FALSE(b.empty());
  67. CHECK(1u == b.size());
  68. }
  69. TEST_CASE("add handler", "[active_poller]")
  70. {
  71. zmq::context_t context;
  72. zmq::socket_t socket{context, zmq::socket_type::router};
  73. zmq::active_poller_t active_poller;
  74. zmq::active_poller_t::handler_type handler;
  75. CHECK_NOTHROW(active_poller.add(socket, zmq::event_flags::pollin, handler));
  76. }
  77. #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 0)
  78. // this behaviour was added by https://github.com/zeromq/libzmq/pull/3100
  79. TEST_CASE("add handler invalid events type", "[active_poller]")
  80. {
  81. zmq::context_t context;
  82. zmq::socket_t socket{context, zmq::socket_type::router};
  83. zmq::active_poller_t active_poller;
  84. zmq::active_poller_t::handler_type handler;
  85. short invalid_events_type = 2 << 10;
  86. CHECK_THROWS_AS(
  87. active_poller.add(socket, static_cast<zmq::event_flags>(invalid_events_type),
  88. handler), const zmq::error_t&);
  89. CHECK(active_poller.empty());
  90. CHECK(0u == active_poller.size());
  91. }
  92. #endif
  93. TEST_CASE("add handler twice throws", "[active_poller]")
  94. {
  95. zmq::context_t context;
  96. zmq::socket_t socket{context, zmq::socket_type::router};
  97. zmq::active_poller_t active_poller;
  98. zmq::active_poller_t::handler_type handler;
  99. active_poller.add(socket, zmq::event_flags::pollin, handler);
  100. /// \todo the actual error code should be checked
  101. CHECK_THROWS_AS(active_poller.add(socket, zmq::event_flags::pollin, handler),
  102. const zmq::error_t&);
  103. }
  104. TEST_CASE("wait with no handlers throws", "[active_poller]")
  105. {
  106. zmq::active_poller_t active_poller;
  107. /// \todo the actual error code should be checked
  108. CHECK_THROWS_AS(active_poller.wait(std::chrono::milliseconds{10}),
  109. const zmq::error_t&);
  110. }
  111. TEST_CASE("remove unregistered throws", "[active_poller]")
  112. {
  113. zmq::context_t context;
  114. zmq::socket_t socket{context, zmq::socket_type::router};
  115. zmq::active_poller_t active_poller;
  116. /// \todo the actual error code should be checked
  117. CHECK_THROWS_AS(active_poller.remove(socket), const zmq::error_t&);
  118. }
  119. TEST_CASE("remove registered empty", "[active_poller]")
  120. {
  121. zmq::context_t context;
  122. zmq::socket_t socket{context, zmq::socket_type::router};
  123. zmq::active_poller_t active_poller;
  124. active_poller.add(socket, zmq::event_flags::pollin,
  125. zmq::active_poller_t::handler_type{});
  126. CHECK_NOTHROW(active_poller.remove(socket));
  127. }
  128. TEST_CASE("remove registered non empty", "[active_poller]")
  129. {
  130. zmq::context_t context;
  131. zmq::socket_t socket{context, zmq::socket_type::router};
  132. zmq::active_poller_t active_poller;
  133. active_poller.add(socket, zmq::event_flags::pollin, [](zmq::event_flags)
  134. {
  135. });
  136. CHECK_NOTHROW(active_poller.remove(socket));
  137. }
  138. namespace
  139. {
  140. struct server_client_setup : common_server_client_setup
  141. {
  142. zmq::active_poller_t::handler_type handler = [&](zmq::event_flags e)
  143. {
  144. events = e;
  145. };
  146. zmq::event_flags events = zmq::event_flags::none;
  147. };
  148. const std::string hi_str = "Hi";
  149. }
  150. TEST_CASE("poll basic", "[active_poller]")
  151. {
  152. server_client_setup s;
  153. CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
  154. zmq::active_poller_t active_poller;
  155. bool message_received = false;
  156. zmq::active_poller_t::handler_type handler = [&message_received
  157. ](zmq::event_flags events)
  158. {
  159. CHECK(zmq::event_flags::none != (events & zmq::event_flags::pollin));
  160. message_received = true;
  161. };
  162. CHECK_NOTHROW(active_poller.add(s.server, zmq::event_flags::pollin, handler));
  163. CHECK(1 == active_poller.wait(std::chrono::milliseconds{-1}));
  164. CHECK(message_received);
  165. }
  166. /// \todo this contains multiple test cases that should be split up
  167. TEST_CASE("client server", "[active_poller]")
  168. {
  169. const std::string send_msg = hi_str;
  170. // Setup server and client
  171. server_client_setup s;
  172. // Setup active_poller
  173. zmq::active_poller_t active_poller;
  174. zmq::event_flags events;
  175. zmq::active_poller_t::handler_type handler = [&](zmq::event_flags e)
  176. {
  177. if (zmq::event_flags::none != (e & zmq::event_flags::pollin)) {
  178. zmq::message_t zmq_msg;
  179. CHECK_NOTHROW(s.server.recv(zmq_msg)); // get message
  180. std::string recv_msg(zmq_msg.data<char>(), zmq_msg.size());
  181. CHECK(send_msg == recv_msg);
  182. } else if (zmq::event_flags::none != (e & ~zmq::event_flags::pollout)) {
  183. INFO("Unexpected event type " << static_cast<short>(events));
  184. REQUIRE(false);
  185. }
  186. events = e;
  187. };
  188. CHECK_NOTHROW(active_poller.add(s.server, zmq::event_flags::pollin, handler));
  189. // client sends message
  190. CHECK_NOTHROW(s.client.send(zmq::message_t{send_msg}, zmq::send_flags::none));
  191. CHECK(1 == active_poller.wait(std::chrono::milliseconds{-1}));
  192. CHECK(events == zmq::event_flags::pollin);
  193. // Re-add server socket with pollout flag
  194. CHECK_NOTHROW(active_poller.remove(s.server));
  195. CHECK_NOTHROW(
  196. active_poller.add(s.server, zmq::event_flags::pollin | zmq::event_flags::
  197. pollout, handler));
  198. CHECK(1 == active_poller.wait(std::chrono::milliseconds{-1}));
  199. CHECK(events == zmq::event_flags::pollout);
  200. }
  201. TEST_CASE("add invalid socket throws", "[active_poller]")
  202. {
  203. zmq::context_t context;
  204. zmq::active_poller_t active_poller;
  205. zmq::socket_t a{context, zmq::socket_type::router};
  206. zmq::socket_t b{std::move(a)};
  207. CHECK_THROWS_AS(
  208. active_poller.add(a, zmq::event_flags::pollin, zmq::active_poller_t::
  209. handler_type{}),
  210. const zmq::error_t&);
  211. }
  212. TEST_CASE("remove invalid socket throws", "[active_poller]")
  213. {
  214. zmq::context_t context;
  215. zmq::socket_t socket{context, zmq::socket_type::router};
  216. zmq::active_poller_t active_poller;
  217. CHECK_NOTHROW(
  218. active_poller.add(socket, zmq::event_flags::pollin, zmq::active_poller_t::
  219. handler_type{}));
  220. CHECK(1u == active_poller.size());
  221. std::vector<zmq::socket_t> sockets;
  222. sockets.emplace_back(std::move(socket));
  223. CHECK_THROWS_AS(active_poller.remove(socket), const zmq::error_t&);
  224. CHECK(1u == active_poller.size());
  225. }
  226. TEST_CASE("wait on added empty handler", "[active_poller]")
  227. {
  228. server_client_setup s;
  229. CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
  230. zmq::active_poller_t active_poller;
  231. zmq::active_poller_t::handler_type handler;
  232. CHECK_NOTHROW(active_poller.add(s.server, zmq::event_flags::pollin, handler));
  233. CHECK_NOTHROW(active_poller.wait(std::chrono::milliseconds{-1}));
  234. }
  235. TEST_CASE("modify empty throws", "[active_poller]")
  236. {
  237. zmq::context_t context;
  238. zmq::socket_t socket{context, zmq::socket_type::push};
  239. zmq::active_poller_t active_poller;
  240. CHECK_THROWS_AS(active_poller.modify(socket, zmq::event_flags::pollin),
  241. const zmq::error_t&);
  242. }
  243. TEST_CASE("modify invalid socket throws", "[active_poller]")
  244. {
  245. zmq::context_t context;
  246. zmq::socket_t a{context, zmq::socket_type::push};
  247. zmq::socket_t b{std::move(a)};
  248. zmq::active_poller_t active_poller;
  249. CHECK_THROWS_AS(active_poller.modify(a, zmq::event_flags::pollin),
  250. const zmq::error_t&);
  251. }
  252. TEST_CASE("modify not added throws", "[active_poller]")
  253. {
  254. zmq::context_t context;
  255. zmq::socket_t a{context, zmq::socket_type::push};
  256. zmq::socket_t b{context, zmq::socket_type::push};
  257. zmq::active_poller_t active_poller;
  258. CHECK_NOTHROW(
  259. active_poller.add(a, zmq::event_flags::pollin, zmq::active_poller_t::
  260. handler_type{}));
  261. CHECK_THROWS_AS(active_poller.modify(b, zmq::event_flags::pollin),
  262. const zmq::error_t&);
  263. }
  264. TEST_CASE("modify simple", "[active_poller]")
  265. {
  266. zmq::context_t context;
  267. zmq::socket_t a{context, zmq::socket_type::push};
  268. zmq::active_poller_t active_poller;
  269. CHECK_NOTHROW(
  270. active_poller.add(a, zmq::event_flags::pollin, zmq::active_poller_t::
  271. handler_type{}));
  272. CHECK_NOTHROW(
  273. active_poller.modify(a, zmq::event_flags::pollin | zmq::event_flags::pollout
  274. ));
  275. }
  276. TEST_CASE("poll client server", "[active_poller]")
  277. {
  278. // Setup server and client
  279. server_client_setup s;
  280. // Setup active_poller
  281. zmq::active_poller_t active_poller;
  282. CHECK_NOTHROW(active_poller.add(s.server, zmq::event_flags::pollin, s.handler));
  283. // client sends message
  284. CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
  285. // wait for message and verify events
  286. CHECK_NOTHROW(active_poller.wait(std::chrono::milliseconds{500}));
  287. CHECK(s.events == zmq::event_flags::pollin);
  288. // Modify server socket with pollout flag
  289. CHECK_NOTHROW(
  290. active_poller.modify(s.server, zmq::event_flags::pollin | zmq::event_flags::
  291. pollout));
  292. CHECK(1 == active_poller.wait(std::chrono::milliseconds{500}));
  293. CHECK(s.events == (zmq::event_flags::pollin | zmq::event_flags::pollout));
  294. }
  295. TEST_CASE("wait one return", "[active_poller]")
  296. {
  297. // Setup server and client
  298. server_client_setup s;
  299. int count = 0;
  300. // Setup active_poller
  301. zmq::active_poller_t active_poller;
  302. CHECK_NOTHROW(
  303. active_poller.add(s.server, zmq::event_flags::pollin, [&count](zmq::
  304. event_flags) { ++count; }));
  305. // client sends message
  306. CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
  307. // wait for message and verify events
  308. CHECK(1 == active_poller.wait(std::chrono::milliseconds{500}));
  309. CHECK(1u == count);
  310. }
  311. TEST_CASE("wait on move constructed active_poller", "[active_poller]")
  312. {
  313. server_client_setup s;
  314. CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
  315. zmq::active_poller_t a;
  316. zmq::active_poller_t::handler_type handler;
  317. CHECK_NOTHROW(a.add(s.server, zmq::event_flags::pollin, handler));
  318. zmq::active_poller_t b{std::move(a)};
  319. CHECK(1u == b.size());
  320. /// \todo the actual error code should be checked
  321. CHECK_THROWS_AS(a.wait(std::chrono::milliseconds{10}), const zmq::error_t&);
  322. CHECK(b.wait(std::chrono::milliseconds{-1}));
  323. }
  324. TEST_CASE("wait on move assigned active_poller", "[active_poller]")
  325. {
  326. server_client_setup s;
  327. CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
  328. zmq::active_poller_t a;
  329. zmq::active_poller_t::handler_type handler;
  330. CHECK_NOTHROW(a.add(s.server, zmq::event_flags::pollin, handler));
  331. zmq::active_poller_t b;
  332. b = {std::move(a)};
  333. CHECK(1u == b.size());
  334. /// \todo the actual error code should be checked
  335. CHECK_THROWS_AS(a.wait(std::chrono::milliseconds{10}), const zmq::error_t&);
  336. CHECK(b.wait(std::chrono::milliseconds{-1}));
  337. }
  338. TEST_CASE("received on move constructed active_poller", "[active_poller]")
  339. {
  340. // Setup server and client
  341. server_client_setup s;
  342. int count = 0;
  343. // Setup active_poller a
  344. zmq::active_poller_t a;
  345. CHECK_NOTHROW(
  346. a.add(s.server, zmq::event_flags::pollin, [&count](zmq::event_flags) { ++
  347. count; }));
  348. // client sends message
  349. CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
  350. // wait for message and verify it is received
  351. CHECK(1 == a.wait(std::chrono::milliseconds{500}));
  352. CHECK(1u == count);
  353. // Move construct active_poller b
  354. zmq::active_poller_t b{std::move(a)};
  355. // client sends message again
  356. CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
  357. // wait for message and verify it is received
  358. CHECK(1 == b.wait(std::chrono::milliseconds{500}));
  359. CHECK(2u == count);
  360. }
  361. TEST_CASE("remove from handler", "[active_poller]")
  362. {
  363. constexpr size_t ITER_NO = 10;
  364. // Setup servers and clients
  365. std::vector<server_client_setup> setup_list;
  366. for (size_t i = 0; i < ITER_NO; ++i)
  367. setup_list.emplace_back(server_client_setup{});
  368. // Setup active_poller
  369. zmq::active_poller_t active_poller;
  370. int count = 0;
  371. for (size_t i = 0; i < ITER_NO; ++i) {
  372. CHECK_NOTHROW(
  373. active_poller.add(setup_list[i].server, zmq::event_flags::pollin, [&, i](
  374. zmq::event_flags events) {
  375. CHECK(events == zmq::event_flags::pollin);
  376. active_poller.remove(setup_list[ITER_NO - i - 1].server);
  377. CHECK((ITER_NO - i - 1) == active_poller.size());
  378. }));
  379. ++count;
  380. }
  381. CHECK(ITER_NO == active_poller.size());
  382. // Clients send messages
  383. for (auto &s : setup_list) {
  384. CHECK_NOTHROW(s.client.send(zmq::message_t{hi_str}, zmq::send_flags::none));
  385. }
  386. // Wait for all servers to receive a message
  387. for (auto &s : setup_list) {
  388. zmq::pollitem_t items[] = {{s.server, 0, ZMQ_POLLIN, 0}};
  389. zmq::poll(&items[0], 1);
  390. }
  391. // Fire all handlers in one wait
  392. CHECK(ITER_NO == active_poller.wait(std::chrono::milliseconds{-1}));
  393. CHECK(ITER_NO == count);
  394. }
  395. #endif