test_monitor.cpp 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463
  1. /*
  2. Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
  3. This file is part of libzmq, the ZeroMQ core engine in C++.
  4. libzmq is free software; you can redistribute it and/or modify it under
  5. the terms of the GNU Lesser General Public License (LGPL) as published
  6. by the Free Software Foundation; either version 3 of the License, or
  7. (at your option) any later version.
  8. As a special exception, the Contributors give you permission to link
  9. this library with independent modules to produce an executable,
  10. regardless of the license terms of these independent modules, and to
  11. copy and distribute the resulting executable under terms of your choice,
  12. provided that you also meet, for each linked independent module, the
  13. terms and conditions of the license of that module. An independent
  14. module is a module which is not derived from or based on this library.
  15. If you modify this library, you must extend this exception to your
  16. version of the library.
  17. libzmq is distributed in the hope that it will be useful, but WITHOUT
  18. ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  19. FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
  20. License for more details.
  21. You should have received a copy of the GNU Lesser General Public License
  22. along with this program. If not, see <http://www.gnu.org/licenses/>.
  23. */
  24. #include "testutil.hpp"
  25. #include "testutil_monitoring.hpp"
  26. #include "testutil_unity.hpp"
  27. #include <stdlib.h>
  28. #include <string.h>
  29. SETUP_TEARDOWN_TESTCONTEXT
  30. void test_monitor_invalid_protocol_fails ()
  31. {
  32. void *client = test_context_socket (ZMQ_DEALER);
  33. // Socket monitoring only works over inproc://
  34. TEST_ASSERT_FAILURE_ERRNO (
  35. EPROTONOSUPPORT, zmq_socket_monitor (client, "tcp://127.0.0.1:*", 0));
  36. #ifdef ZMQ_EVENT_PIPES_STATS
  37. // Stats command needs to be called on a valid socket with monitoring
  38. // enabled
  39. TEST_ASSERT_FAILURE_ERRNO (ENOTSOCK, zmq_socket_monitor_pipes_stats (NULL));
  40. TEST_ASSERT_FAILURE_ERRNO (EINVAL, zmq_socket_monitor_pipes_stats (client));
  41. #endif
  42. test_context_socket_close_zero_linger (client);
  43. }
  44. void test_monitor_basic ()
  45. {
  46. char my_endpoint[MAX_SOCKET_STRING];
  47. // We'll monitor these two sockets
  48. void *client = test_context_socket (ZMQ_DEALER);
  49. void *server = test_context_socket (ZMQ_DEALER);
  50. // Monitor all events on client and server sockets
  51. TEST_ASSERT_SUCCESS_ERRNO (
  52. zmq_socket_monitor (client, "inproc://monitor-client", ZMQ_EVENT_ALL));
  53. TEST_ASSERT_SUCCESS_ERRNO (
  54. zmq_socket_monitor (server, "inproc://monitor-server", ZMQ_EVENT_ALL));
  55. // Create two sockets for collecting monitor events
  56. void *client_mon = test_context_socket (ZMQ_PAIR);
  57. void *server_mon = test_context_socket (ZMQ_PAIR);
  58. // Connect these to the inproc endpoints so they'll get events
  59. TEST_ASSERT_SUCCESS_ERRNO (
  60. zmq_connect (client_mon, "inproc://monitor-client"));
  61. TEST_ASSERT_SUCCESS_ERRNO (
  62. zmq_connect (server_mon, "inproc://monitor-server"));
  63. // Now do a basic ping test
  64. bind_loopback_ipv4 (server, my_endpoint, sizeof my_endpoint);
  65. TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (client, my_endpoint));
  66. bounce (server, client);
  67. // Close client and server
  68. // TODO why does this use zero_linger?
  69. test_context_socket_close_zero_linger (client);
  70. test_context_socket_close_zero_linger (server);
  71. // Now collect and check events from both sockets
  72. int event = get_monitor_event (client_mon, NULL, NULL);
  73. if (event == ZMQ_EVENT_CONNECT_DELAYED)
  74. event = get_monitor_event (client_mon, NULL, NULL);
  75. TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_CONNECTED, event);
  76. expect_monitor_event (client_mon, ZMQ_EVENT_HANDSHAKE_SUCCEEDED);
  77. event = get_monitor_event (client_mon, NULL, NULL);
  78. if (event == ZMQ_EVENT_DISCONNECTED) {
  79. expect_monitor_event (client_mon, ZMQ_EVENT_CONNECT_RETRIED);
  80. expect_monitor_event (client_mon, ZMQ_EVENT_MONITOR_STOPPED);
  81. } else
  82. TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_MONITOR_STOPPED, event);
  83. // This is the flow of server events
  84. expect_monitor_event (server_mon, ZMQ_EVENT_LISTENING);
  85. expect_monitor_event (server_mon, ZMQ_EVENT_ACCEPTED);
  86. expect_monitor_event (server_mon, ZMQ_EVENT_HANDSHAKE_SUCCEEDED);
  87. event = get_monitor_event (server_mon, NULL, NULL);
  88. // Sometimes the server sees the client closing before it gets closed.
  89. if (event != ZMQ_EVENT_DISCONNECTED) {
  90. TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_CLOSED, event);
  91. event = get_monitor_event (server_mon, NULL, NULL);
  92. }
  93. if (event != ZMQ_EVENT_DISCONNECTED) {
  94. TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_MONITOR_STOPPED, event);
  95. }
  96. // TODO: When not waiting until the monitor stopped, the I/O thread runs
  97. // into some deadlock. This must be fixed, but until it is fixed, we wait
  98. // here in order to have more reliable test execution.
  99. while (event != ZMQ_EVENT_MONITOR_STOPPED) {
  100. event = get_monitor_event (server_mon, NULL, NULL);
  101. }
  102. // Close down the sockets
  103. // TODO why does this use zero_linger?
  104. test_context_socket_close_zero_linger (client_mon);
  105. test_context_socket_close_zero_linger (server_mon);
  106. }
  107. #if (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION >= 2) \
  108. || (defined ZMQ_CURRENT_EVENT_VERSION \
  109. && ZMQ_CURRENT_EVENT_VERSION_DRAFT >= 2)
  110. void test_monitor_versioned_invalid_socket_type ()
  111. {
  112. void *client = test_context_socket (ZMQ_DEALER);
  113. // Socket monitoring only works with ZMQ_PAIR, ZMQ_PUB and ZMQ_PUSH.
  114. TEST_ASSERT_FAILURE_ERRNO (
  115. EINVAL, zmq_socket_monitor_versioned (
  116. client, "inproc://invalid-socket-type", 0, 2, ZMQ_CLIENT));
  117. test_context_socket_close_zero_linger (client);
  118. }
  119. void test_monitor_versioned_basic (bind_function_t bind_function_,
  120. const char *expected_prefix_,
  121. int type_)
  122. {
  123. char server_endpoint[MAX_SOCKET_STRING];
  124. char client_mon_endpoint[MAX_SOCKET_STRING];
  125. char server_mon_endpoint[MAX_SOCKET_STRING];
  126. // Create a unique endpoint for each call so we don't have
  127. // to wait for the sockets to unbind.
  128. snprintf (client_mon_endpoint, MAX_SOCKET_STRING, "inproc://client%s%d",
  129. expected_prefix_, type_);
  130. snprintf (server_mon_endpoint, MAX_SOCKET_STRING, "inproc://server%s%d",
  131. expected_prefix_, type_);
  132. // We'll monitor these two sockets
  133. void *client = test_context_socket (ZMQ_DEALER);
  134. void *server = test_context_socket (ZMQ_DEALER);
  135. // Monitor all events on client and server sockets
  136. TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor_versioned (
  137. client, client_mon_endpoint, ZMQ_EVENT_ALL_V2, 2, type_));
  138. TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor_versioned (
  139. server, server_mon_endpoint, ZMQ_EVENT_ALL_V2, 2, type_));
  140. // Choose the appropriate consumer socket type.
  141. int mon_type = ZMQ_PAIR;
  142. switch (type_) {
  143. case ZMQ_PAIR:
  144. mon_type = ZMQ_PAIR;
  145. break;
  146. case ZMQ_PUSH:
  147. mon_type = ZMQ_PULL;
  148. break;
  149. case ZMQ_PUB:
  150. mon_type = ZMQ_SUB;
  151. break;
  152. }
  153. // Create two sockets for collecting monitor events
  154. void *client_mon = test_context_socket (mon_type);
  155. void *server_mon = test_context_socket (mon_type);
  156. // Additionally subscribe to all events if a PUB socket is used.
  157. if (type_ == ZMQ_PUB) {
  158. TEST_ASSERT_SUCCESS_ERRNO (
  159. zmq_setsockopt (client_mon, ZMQ_SUBSCRIBE, "", 0));
  160. TEST_ASSERT_SUCCESS_ERRNO (
  161. zmq_setsockopt (server_mon, ZMQ_SUBSCRIBE, "", 0));
  162. }
  163. // Connect these to the inproc endpoints so they'll get events
  164. TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (client_mon, client_mon_endpoint));
  165. TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (server_mon, server_mon_endpoint));
  166. // Now do a basic ping test
  167. bind_function_ (server, server_endpoint, sizeof server_endpoint);
  168. int ipv6;
  169. size_t ipv6_size = sizeof (ipv6);
  170. TEST_ASSERT_SUCCESS_ERRNO (
  171. zmq_getsockopt (server, ZMQ_IPV6, &ipv6, &ipv6_size));
  172. TEST_ASSERT_SUCCESS_ERRNO (
  173. zmq_setsockopt (client, ZMQ_IPV6, &ipv6, sizeof (int)));
  174. TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (client, server_endpoint));
  175. bounce (server, client);
  176. // Close client and server
  177. // TODO why does this use zero_linger?
  178. test_context_socket_close_zero_linger (client);
  179. test_context_socket_close_zero_linger (server);
  180. char *client_local_address = NULL;
  181. char *client_remote_address = NULL;
  182. // Now collect and check events from both sockets
  183. int64_t event = get_monitor_event_v2 (
  184. client_mon, NULL, &client_local_address, &client_remote_address);
  185. if (event == ZMQ_EVENT_CONNECT_DELAYED) {
  186. free (client_local_address);
  187. free (client_remote_address);
  188. event = get_monitor_event_v2 (client_mon, NULL, &client_local_address,
  189. &client_remote_address);
  190. }
  191. TEST_ASSERT_EQUAL (ZMQ_EVENT_CONNECTED, event);
  192. TEST_ASSERT_EQUAL_STRING (server_endpoint, client_remote_address);
  193. TEST_ASSERT_EQUAL_STRING_LEN (expected_prefix_, client_local_address,
  194. strlen (expected_prefix_));
  195. TEST_ASSERT_NOT_EQUAL (
  196. 0, strcmp (client_local_address, client_remote_address));
  197. expect_monitor_event_v2 (client_mon, ZMQ_EVENT_HANDSHAKE_SUCCEEDED,
  198. client_local_address, client_remote_address);
  199. event = get_monitor_event_v2 (client_mon, NULL, NULL, NULL);
  200. if (event == ZMQ_EVENT_DISCONNECTED) {
  201. expect_monitor_event_v2 (client_mon, ZMQ_EVENT_CONNECT_RETRIED,
  202. client_local_address, client_remote_address);
  203. expect_monitor_event_v2 (client_mon, ZMQ_EVENT_MONITOR_STOPPED, "", "");
  204. } else
  205. TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_MONITOR_STOPPED, event);
  206. // This is the flow of server events
  207. expect_monitor_event_v2 (server_mon, ZMQ_EVENT_LISTENING,
  208. client_remote_address, "");
  209. expect_monitor_event_v2 (server_mon, ZMQ_EVENT_ACCEPTED,
  210. client_remote_address, client_local_address);
  211. expect_monitor_event_v2 (server_mon, ZMQ_EVENT_HANDSHAKE_SUCCEEDED,
  212. client_remote_address, client_local_address);
  213. event = get_monitor_event_v2 (server_mon, NULL, NULL, NULL);
  214. // Sometimes the server sees the client closing before it gets closed.
  215. if (event != ZMQ_EVENT_DISCONNECTED) {
  216. TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_CLOSED, event);
  217. event = get_monitor_event_v2 (server_mon, NULL, NULL, NULL);
  218. }
  219. if (event != ZMQ_EVENT_DISCONNECTED) {
  220. TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_MONITOR_STOPPED, event);
  221. }
  222. // TODO: When not waiting until the monitor stopped, the I/O thread runs
  223. // into some deadlock. This must be fixed, but until it is fixed, we wait
  224. // here in order to have more reliable test execution.
  225. while (event != ZMQ_EVENT_MONITOR_STOPPED) {
  226. event = get_monitor_event_v2 (server_mon, NULL, NULL, NULL);
  227. }
  228. free (client_local_address);
  229. free (client_remote_address);
  230. // Close down the sockets
  231. // TODO why does this use zero_linger?
  232. test_context_socket_close_zero_linger (client_mon);
  233. test_context_socket_close_zero_linger (server_mon);
  234. }
  235. void test_monitor_versioned_basic_tcp_ipv4 ()
  236. {
  237. static const char prefix[] = "tcp://127.0.0.1:";
  238. test_monitor_versioned_basic (bind_loopback_ipv4, prefix, ZMQ_PAIR);
  239. test_monitor_versioned_basic (bind_loopback_ipv4, prefix, ZMQ_PUB);
  240. test_monitor_versioned_basic (bind_loopback_ipv4, prefix, ZMQ_PUSH);
  241. }
  242. void test_monitor_versioned_basic_tcp_ipv6 ()
  243. {
  244. static const char prefix[] = "tcp://[::1]:";
  245. test_monitor_versioned_basic (bind_loopback_ipv6, prefix, ZMQ_PAIR);
  246. test_monitor_versioned_basic (bind_loopback_ipv6, prefix, ZMQ_PUB);
  247. test_monitor_versioned_basic (bind_loopback_ipv6, prefix, ZMQ_PUSH);
  248. }
  249. void test_monitor_versioned_basic_ipc ()
  250. {
  251. static const char prefix[] = "ipc://";
  252. test_monitor_versioned_basic (bind_loopback_ipc, prefix, ZMQ_PAIR);
  253. test_monitor_versioned_basic (bind_loopback_ipc, prefix, ZMQ_PUB);
  254. test_monitor_versioned_basic (bind_loopback_ipc, prefix, ZMQ_PUSH);
  255. }
  256. void test_monitor_versioned_basic_tipc ()
  257. {
  258. static const char prefix[] = "tipc://";
  259. test_monitor_versioned_basic (bind_loopback_tipc, prefix, ZMQ_PAIR);
  260. test_monitor_versioned_basic (bind_loopback_tipc, prefix, ZMQ_PUB);
  261. test_monitor_versioned_basic (bind_loopback_tipc, prefix, ZMQ_PUSH);
  262. }
  263. #ifdef ZMQ_EVENT_PIPES_STATS
  264. void test_monitor_versioned_stats (bind_function_t bind_function_,
  265. const char *expected_prefix_)
  266. {
  267. char server_endpoint[MAX_SOCKET_STRING];
  268. const int pulls_count = 4;
  269. void *pulls[pulls_count];
  270. // We'll monitor these two sockets
  271. void *push = test_context_socket (ZMQ_PUSH);
  272. TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor_versioned (
  273. push, "inproc://monitor-push", ZMQ_EVENT_PIPES_STATS, 2, ZMQ_PAIR));
  274. // Should fail if there are no pipes to monitor
  275. TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_socket_monitor_pipes_stats (push));
  276. void *push_mon = test_context_socket (ZMQ_PAIR);
  277. TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (push_mon, "inproc://monitor-push"));
  278. // Set lower HWM - queues will be filled so we should see it in the stats
  279. int send_hwm = 500;
  280. TEST_ASSERT_SUCCESS_ERRNO (
  281. zmq_setsockopt (push, ZMQ_SNDHWM, &send_hwm, sizeof (send_hwm)));
  282. // Set very low TCP buffers so that messages cannot be stored in-flight
  283. const int tcp_buffer_size = 4096;
  284. TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
  285. push, ZMQ_SNDBUF, &tcp_buffer_size, sizeof (tcp_buffer_size)));
  286. bind_function_ (push, server_endpoint, sizeof (server_endpoint));
  287. int ipv6;
  288. size_t ipv6_size = sizeof (ipv6);
  289. TEST_ASSERT_SUCCESS_ERRNO (
  290. zmq_getsockopt (push, ZMQ_IPV6, &ipv6, &ipv6_size));
  291. for (int i = 0; i < pulls_count; ++i) {
  292. pulls[i] = test_context_socket (ZMQ_PULL);
  293. TEST_ASSERT_SUCCESS_ERRNO (
  294. zmq_setsockopt (pulls[i], ZMQ_IPV6, &ipv6, sizeof (int)));
  295. int timeout_ms = 10;
  296. TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
  297. pulls[i], ZMQ_RCVTIMEO, &timeout_ms, sizeof (timeout_ms)));
  298. TEST_ASSERT_SUCCESS_ERRNO (
  299. zmq_setsockopt (pulls[i], ZMQ_RCVHWM, &send_hwm, sizeof (send_hwm)));
  300. TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
  301. pulls[i], ZMQ_RCVBUF, &tcp_buffer_size, sizeof (tcp_buffer_size)));
  302. TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pulls[i], server_endpoint));
  303. }
  304. // Send until we block
  305. int send_count = 0;
  306. // Saturate the TCP buffers too
  307. char data[tcp_buffer_size * 2];
  308. memset (data, 0, sizeof (data));
  309. // Saturate all pipes - send + receive - on all connections
  310. while (send_count < send_hwm * 2 * pulls_count) {
  311. TEST_ASSERT_EQUAL_INT (sizeof (data),
  312. zmq_send (push, data, sizeof (data), 0));
  313. ++send_count;
  314. }
  315. // Drain one of the pulls - doesn't matter how many messages, at least one
  316. send_count = send_count / 4;
  317. do {
  318. zmq_recv (pulls[0], data, sizeof (data), 0);
  319. --send_count;
  320. } while (send_count > 0);
  321. // To kick the application thread, do a dummy getsockopt - users here
  322. // should use the monitor and the other sockets in a poll.
  323. unsigned long int dummy;
  324. size_t dummy_size = sizeof (dummy);
  325. msleep (SETTLE_TIME);
  326. // Note that the pipe stats on the sender will not get updated until the
  327. // receiver has processed at least lwm ((hwm + 1) / 2) messages AND until
  328. // the application thread has ran through the mailbox, as the update is
  329. // delivered via a message (send_activate_write)
  330. zmq_getsockopt (push, ZMQ_EVENTS, &dummy, &dummy_size);
  331. // Ask for stats and check that they match
  332. zmq_socket_monitor_pipes_stats (push);
  333. msleep (SETTLE_TIME);
  334. zmq_getsockopt (push, ZMQ_EVENTS, &dummy, &dummy_size);
  335. for (int i = 0; i < pulls_count; ++i) {
  336. char *push_local_address = NULL;
  337. char *push_remote_address = NULL;
  338. uint64_t queue_stat[2];
  339. int64_t event = get_monitor_event_v2 (
  340. push_mon, queue_stat, &push_local_address, &push_remote_address);
  341. TEST_ASSERT_EQUAL_STRING (server_endpoint, push_local_address);
  342. TEST_ASSERT_EQUAL_STRING_LEN (expected_prefix_, push_remote_address,
  343. strlen (expected_prefix_));
  344. TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_PIPES_STATS, event);
  345. TEST_ASSERT_EQUAL_INT (i == 0 ? 0 : send_hwm, queue_stat[0]);
  346. TEST_ASSERT_EQUAL_INT (0, queue_stat[1]);
  347. free (push_local_address);
  348. free (push_remote_address);
  349. }
  350. // Close client and server
  351. test_context_socket_close_zero_linger (push_mon);
  352. test_context_socket_close_zero_linger (push);
  353. for (int i = 0; i < pulls_count; ++i)
  354. test_context_socket_close_zero_linger (pulls[i]);
  355. }
  356. void test_monitor_versioned_stats_tcp_ipv4 ()
  357. {
  358. static const char prefix[] = "tcp://127.0.0.1:";
  359. test_monitor_versioned_stats (bind_loopback_ipv4, prefix);
  360. }
  361. void test_monitor_versioned_stats_tcp_ipv6 ()
  362. {
  363. static const char prefix[] = "tcp://[::1]:";
  364. test_monitor_versioned_stats (bind_loopback_ipv6, prefix);
  365. }
  366. void test_monitor_versioned_stats_ipc ()
  367. {
  368. static const char prefix[] = "ipc://";
  369. test_monitor_versioned_stats (bind_loopback_ipc, prefix);
  370. }
  371. #endif // ZMQ_EVENT_PIPES_STATS
  372. #endif
  373. int main ()
  374. {
  375. setup_test_environment ();
  376. UNITY_BEGIN ();
  377. RUN_TEST (test_monitor_invalid_protocol_fails);
  378. RUN_TEST (test_monitor_basic);
  379. #if (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION >= 2) \
  380. || (defined ZMQ_CURRENT_EVENT_VERSION \
  381. && ZMQ_CURRENT_EVENT_VERSION_DRAFT >= 2)
  382. RUN_TEST (test_monitor_versioned_invalid_socket_type);
  383. RUN_TEST (test_monitor_versioned_basic_tcp_ipv4);
  384. RUN_TEST (test_monitor_versioned_basic_tcp_ipv6);
  385. RUN_TEST (test_monitor_versioned_basic_ipc);
  386. RUN_TEST (test_monitor_versioned_basic_tipc);
  387. #ifdef ZMQ_EVENT_PIPES_STATS
  388. RUN_TEST (test_monitor_versioned_stats_tcp_ipv4);
  389. RUN_TEST (test_monitor_versioned_stats_tcp_ipv6);
  390. RUN_TEST (test_monitor_versioned_stats_ipc);
  391. #endif
  392. #endif
  393. return UNITY_END ();
  394. }