proxy_thr.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. /*
  2. Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
  3. This file is part of libzmq, the ZeroMQ core engine in C++.
  4. libzmq is free software; you can redistribute it and/or modify it under
  5. the terms of the GNU Lesser General Public License (LGPL) as published
  6. by the Free Software Foundation; either version 3 of the License, or
  7. (at your option) any later version.
  8. As a special exception, the Contributors give you permission to link
  9. this library with independent modules to produce an executable,
  10. regardless of the license terms of these independent modules, and to
  11. copy and distribute the resulting executable under terms of your choice,
  12. provided that you also meet, for each linked independent module, the
  13. terms and conditions of the license of that module. An independent
  14. module is a module which is not derived from or based on this library.
  15. If you modify this library, you must extend this exception to your
  16. version of the library.
  17. libzmq is distributed in the hope that it will be useful, but WITHOUT
  18. ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  19. FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
  20. License for more details.
  21. You should have received a copy of the GNU Lesser General Public License
  22. along with this program. If not, see <http://www.gnu.org/licenses/>.
  23. */
  24. #include "../include/zmq.h"
  25. #include <stdio.h>
  26. #include <stdlib.h>
  27. #include <string.h>
  28. #include <assert.h>
  29. #include <time.h>
  30. #include <stdarg.h>
  31. #include <string.h>
  32. #include <string>
  33. #include "platform.hpp"
  34. #if defined ZMQ_HAVE_WINDOWS
  35. #include <windows.h>
  36. #include <process.h>
  37. #else
  38. #include <pthread.h>
  39. #include <unistd.h>
  40. #endif
  41. /*
  42. Asynchronous proxy benchmark using ZMQ_XPUB_NODROP.
  43. Topology:
  44. XPUB SUB
  45. | |
  46. +-----> XSUB -> XPUB -----/
  47. | ^^^^^^^^^^^^
  48. XPUB ZMQ proxy
  49. All connections use "inproc" transport. The two XPUB sockets start
  50. flooding the proxy. The throughput is computed using the bytes received
  51. in the SUB socket.
  52. */
  53. #define HWM 10000
  54. #ifndef ARRAY_SIZE
  55. #define ARRAY_SIZE(x) (sizeof (x) / sizeof (*x))
  56. #endif
  57. #define TEST_ASSERT_SUCCESS_ERRNO(expr) \
  58. test_assert_success_message_errno_helper (expr, NULL, #expr)
  59. // This macro is used to avoid-variable warning. If used with an expression,
  60. // the sizeof is not evaluated to avoid polluting the assembly code.
  61. #ifdef NDEBUG
  62. #define ASSERT_EXPR_SAFE(x) \
  63. do { \
  64. (void) sizeof (x); \
  65. } while (0)
  66. #else
  67. #define ASSERT_EXPR_SAFE(x) assert (x)
  68. #endif
  69. static uint64_t message_count = 0;
  70. static size_t message_size = 0;
  71. typedef struct
  72. {
  73. void *context;
  74. int thread_idx;
  75. const char *frontend_endpoint[4];
  76. const char *backend_endpoint[4];
  77. const char *control_endpoint;
  78. } proxy_hwm_cfg_t;
  79. int test_assert_success_message_errno_helper (int rc_,
  80. const char *msg_,
  81. const char *expr_)
  82. {
  83. if (rc_ == -1) {
  84. char buffer[512];
  85. buffer[sizeof (buffer) - 1] =
  86. 0; // to ensure defined behavior with VC++ <= 2013
  87. printf ("%s failed%s%s%s, errno = %i (%s)", expr_,
  88. msg_ ? " (additional info: " : "", msg_ ? msg_ : "",
  89. msg_ ? ")" : "", zmq_errno (), zmq_strerror (zmq_errno ()));
  90. exit (1);
  91. }
  92. return rc_;
  93. }
  94. static void set_hwm (void *skt)
  95. {
  96. int hwm = HWM;
  97. TEST_ASSERT_SUCCESS_ERRNO (
  98. zmq_setsockopt (skt, ZMQ_SNDHWM, &hwm, sizeof (hwm)));
  99. TEST_ASSERT_SUCCESS_ERRNO (
  100. zmq_setsockopt (skt, ZMQ_RCVHWM, &hwm, sizeof (hwm)));
  101. }
  102. static void publisher_thread_main (void *pvoid)
  103. {
  104. const proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid;
  105. const int idx = cfg->thread_idx;
  106. int optval;
  107. int rc;
  108. void *pubsocket = zmq_socket (cfg->context, ZMQ_XPUB);
  109. assert (pubsocket);
  110. set_hwm (pubsocket);
  111. optval = 1;
  112. TEST_ASSERT_SUCCESS_ERRNO (
  113. zmq_setsockopt (pubsocket, ZMQ_XPUB_NODROP, &optval, sizeof (optval)));
  114. optval = 1;
  115. TEST_ASSERT_SUCCESS_ERRNO (
  116. zmq_setsockopt (pubsocket, ZMQ_SNDTIMEO, &optval, sizeof (optval)));
  117. TEST_ASSERT_SUCCESS_ERRNO (
  118. zmq_connect (pubsocket, cfg->frontend_endpoint[idx]));
  119. // Wait before starting TX operations till 1 subscriber has subscribed
  120. // (in this test there's 1 subscriber only)
  121. char buffer[32] = {};
  122. rc = TEST_ASSERT_SUCCESS_ERRNO (
  123. zmq_recv (pubsocket, buffer, sizeof (buffer), 0));
  124. if (rc != 1) {
  125. printf ("invalid response length: expected 1, received %d", rc);
  126. exit (1);
  127. }
  128. if (buffer[0] != 1) {
  129. printf ("invalid response value: expected 1, received %d",
  130. (int) buffer[0]);
  131. exit (1);
  132. }
  133. zmq_msg_t msg_orig;
  134. rc = zmq_msg_init_size (&msg_orig, message_size);
  135. assert (rc == 0);
  136. memset (zmq_msg_data (&msg_orig), 'A', zmq_msg_size (&msg_orig));
  137. uint64_t send_count = 0;
  138. while (send_count < message_count) {
  139. zmq_msg_t msg;
  140. zmq_msg_init (&msg);
  141. rc = zmq_msg_copy (&msg, &msg_orig);
  142. assert (rc == 0);
  143. // Send the message to the socket
  144. rc = zmq_msg_send (&msg, pubsocket, 0);
  145. if (rc != -1) {
  146. send_count++;
  147. } else {
  148. TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
  149. }
  150. }
  151. zmq_close (pubsocket);
  152. //printf ("publisher thread ended\n");
  153. }
  154. static void subscriber_thread_main (void *pvoid)
  155. {
  156. const proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid;
  157. const int idx = cfg->thread_idx;
  158. void *subsocket = zmq_socket (cfg->context, ZMQ_SUB);
  159. assert (subsocket);
  160. set_hwm (subsocket);
  161. TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (subsocket, ZMQ_SUBSCRIBE, 0, 0));
  162. TEST_ASSERT_SUCCESS_ERRNO (
  163. zmq_connect (subsocket, cfg->backend_endpoint[idx]));
  164. // Receive message_count messages
  165. uint64_t rxsuccess = 0;
  166. bool success = true;
  167. while (success) {
  168. zmq_msg_t msg;
  169. int rc = zmq_msg_init (&msg);
  170. assert (rc == 0);
  171. rc = zmq_msg_recv (&msg, subsocket, 0);
  172. if (rc != -1) {
  173. TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
  174. rxsuccess++;
  175. }
  176. if (rxsuccess == message_count)
  177. break;
  178. }
  179. // Cleanup
  180. zmq_close (subsocket);
  181. //printf ("subscriber thread ended\n");
  182. }
  183. static void proxy_thread_main (void *pvoid)
  184. {
  185. const proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid;
  186. int rc;
  187. // FRONTEND SUB
  188. void *frontend_xsub = zmq_socket (
  189. cfg->context,
  190. ZMQ_XSUB); // the frontend is the one exposed to internal threads (INPROC)
  191. assert (frontend_xsub);
  192. set_hwm (frontend_xsub);
  193. // Bind FRONTEND
  194. for (unsigned int i = 0; i < ARRAY_SIZE (cfg->frontend_endpoint); i++) {
  195. const char *ep = cfg->frontend_endpoint[i];
  196. if (ep != NULL) {
  197. assert (strlen (ep) > 5);
  198. rc = zmq_bind (frontend_xsub, ep);
  199. ASSERT_EXPR_SAFE (rc == 0);
  200. }
  201. }
  202. // BACKEND PUB
  203. void *backend_xpub = zmq_socket (
  204. cfg->context,
  205. ZMQ_XPUB); // the backend is the one exposed to the external world (TCP)
  206. assert (backend_xpub);
  207. int optval = 1;
  208. rc =
  209. zmq_setsockopt (backend_xpub, ZMQ_XPUB_NODROP, &optval, sizeof (optval));
  210. ASSERT_EXPR_SAFE (rc == 0);
  211. set_hwm (backend_xpub);
  212. // Bind BACKEND
  213. for (unsigned int i = 0; i < ARRAY_SIZE (cfg->backend_endpoint); i++) {
  214. const char *ep = cfg->backend_endpoint[i];
  215. if (ep != NULL) {
  216. assert (strlen (ep) > 5);
  217. rc = zmq_bind (backend_xpub, ep);
  218. ASSERT_EXPR_SAFE (rc == 0);
  219. }
  220. }
  221. // CONTROL REP
  222. void *control_rep = zmq_socket (
  223. cfg->context,
  224. ZMQ_REP); // This one is used by the proxy to receive&reply to commands
  225. assert (control_rep);
  226. // Bind CONTROL
  227. rc = zmq_bind (control_rep, cfg->control_endpoint);
  228. ASSERT_EXPR_SAFE (rc == 0);
  229. // Start proxying!
  230. zmq_proxy_steerable (frontend_xsub, backend_xpub, NULL, control_rep);
  231. zmq_close (frontend_xsub);
  232. zmq_close (backend_xpub);
  233. zmq_close (control_rep);
  234. //printf ("proxy thread ended\n");
  235. }
  236. void terminate_proxy (const proxy_hwm_cfg_t *cfg)
  237. {
  238. // CONTROL REQ
  239. void *control_req = zmq_socket (
  240. cfg->context,
  241. ZMQ_REQ); // This one can be used to send command to the proxy
  242. assert (control_req);
  243. // Connect CONTROL-REQ: a socket to which send commands
  244. int rc = zmq_connect (control_req, cfg->control_endpoint);
  245. ASSERT_EXPR_SAFE (rc == 0);
  246. // Ask the proxy to exit: the subscriber has received all messages
  247. rc = zmq_send (control_req, "TERMINATE", 9, 0);
  248. ASSERT_EXPR_SAFE (rc == 9);
  249. zmq_close (control_req);
  250. }
  251. // The main thread simply starts some publishers, a proxy,
  252. // and a subscriber. Finish when all packets are received.
  253. int main (int argc, char *argv[])
  254. {
  255. if (argc != 3) {
  256. printf ("usage: proxy_thr <message-size> <message-count>\n");
  257. return 1;
  258. }
  259. message_size = atoi (argv[1]);
  260. message_count = atoi (argv[2]);
  261. printf ("message size: %d [B]\n", (int) message_size);
  262. printf ("message count: %d\n", (int) message_count);
  263. void *context = zmq_ctx_new ();
  264. assert (context);
  265. int rv = zmq_ctx_set (context, ZMQ_IO_THREADS, 4);
  266. ASSERT_EXPR_SAFE (rv == 0);
  267. // START ALL SECONDARY THREADS
  268. const char *pub1 = "inproc://perf_pub1";
  269. const char *pub2 = "inproc://perf_pub2";
  270. const char *sub1 = "inproc://perf_backend";
  271. proxy_hwm_cfg_t cfg_global = {};
  272. cfg_global.context = context;
  273. cfg_global.frontend_endpoint[0] = pub1;
  274. cfg_global.frontend_endpoint[1] = pub2;
  275. cfg_global.backend_endpoint[0] = sub1;
  276. cfg_global.control_endpoint = "inproc://ctrl";
  277. // Proxy
  278. proxy_hwm_cfg_t cfg_proxy = cfg_global;
  279. void *proxy = zmq_threadstart (&proxy_thread_main, (void *) &cfg_proxy);
  280. assert (proxy != 0);
  281. // Subscriber 1
  282. proxy_hwm_cfg_t cfg_sub1 = cfg_global;
  283. cfg_sub1.thread_idx = 0;
  284. void *subscriber =
  285. zmq_threadstart (&subscriber_thread_main, (void *) &cfg_sub1);
  286. assert (subscriber != 0);
  287. // Start measuring
  288. void *watch = zmq_stopwatch_start ();
  289. // Publisher 1
  290. proxy_hwm_cfg_t cfg_pub1 = cfg_global;
  291. cfg_pub1.thread_idx = 0;
  292. void *publisher1 =
  293. zmq_threadstart (&publisher_thread_main, (void *) &cfg_pub1);
  294. assert (publisher1 != 0);
  295. // Publisher 2
  296. proxy_hwm_cfg_t cfg_pub2 = cfg_global;
  297. cfg_pub2.thread_idx = 1;
  298. void *publisher2 =
  299. zmq_threadstart (&publisher_thread_main, (void *) &cfg_pub2);
  300. assert (publisher2 != 0);
  301. // Wait for all packets to be received
  302. zmq_threadclose (subscriber);
  303. // Stop measuring
  304. unsigned long elapsed = zmq_stopwatch_stop (watch);
  305. if (elapsed == 0)
  306. elapsed = 1;
  307. unsigned long throughput =
  308. (unsigned long) ((double) message_count / (double) elapsed * 1000000);
  309. double megabits = (double) (throughput * message_size * 8) / 1000000;
  310. printf ("mean throughput: %d [msg/s]\n", (int) throughput);
  311. printf ("mean throughput: %.3f [Mb/s]\n", (double) megabits);
  312. // Wait for the end of publishers...
  313. zmq_threadclose (publisher1);
  314. zmq_threadclose (publisher2);
  315. // ... then close the proxy
  316. terminate_proxy (&cfg_proxy);
  317. zmq_threadclose (proxy);
  318. int rc = zmq_ctx_term (context);
  319. ASSERT_EXPR_SAFE (rc == 0);
  320. return 0;
  321. }