inproc_thr.cpp 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. /*
  2. Copyright (c) 2007-2012 iMatix Corporation
  3. Copyright (c) 2009-2011 250bpm s.r.o.
  4. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
  5. This file is part of libzmq, the ZeroMQ core engine in C++.
  6. libzmq is free software; you can redistribute it and/or modify it under
  7. the terms of the GNU Lesser General Public License (LGPL) as published
  8. by the Free Software Foundation; either version 3 of the License, or
  9. (at your option) any later version.
  10. As a special exception, the Contributors give you permission to link
  11. this library with independent modules to produce an executable,
  12. regardless of the license terms of these independent modules, and to
  13. copy and distribute the resulting executable under terms of your choice,
  14. provided that you also meet, for each linked independent module, the
  15. terms and conditions of the license of that module. An independent
  16. module is a module which is not derived from or based on this library.
  17. If you modify this library, you must extend this exception to your
  18. version of the library.
  19. libzmq is distributed in the hope that it will be useful, but WITHOUT
  20. ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  21. FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
  22. License for more details.
  23. You should have received a copy of the GNU Lesser General Public License
  24. along with this program. If not, see <http://www.gnu.org/licenses/>.
  25. */
  26. #include "../include/zmq.h"
  27. #include <stdio.h>
  28. #include <stdlib.h>
  29. #include <string.h>
  30. #include "platform.hpp"
  31. #if defined ZMQ_HAVE_WINDOWS
  32. #include <windows.h>
  33. #include <process.h>
  34. #else
  35. #include <pthread.h>
  36. #endif
  37. static int message_count;
  38. static size_t message_size;
  39. #if defined ZMQ_HAVE_WINDOWS
  40. static unsigned int __stdcall worker (void *ctx_)
  41. #else
  42. static void *worker (void *ctx_)
  43. #endif
  44. {
  45. void *s;
  46. int rc;
  47. int i;
  48. zmq_msg_t msg;
  49. s = zmq_socket (ctx_, ZMQ_PUSH);
  50. if (!s) {
  51. printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
  52. exit (1);
  53. }
  54. rc = zmq_connect (s, "inproc://thr_test");
  55. if (rc != 0) {
  56. printf ("error in zmq_connect: %s\n", zmq_strerror (errno));
  57. exit (1);
  58. }
  59. for (i = 0; i != message_count; i++) {
  60. rc = zmq_msg_init_size (&msg, message_size);
  61. if (rc != 0) {
  62. printf ("error in zmq_msg_init_size: %s\n", zmq_strerror (errno));
  63. exit (1);
  64. }
  65. #if defined ZMQ_MAKE_VALGRIND_HAPPY
  66. memset (zmq_msg_data (&msg), 0, message_size);
  67. #endif
  68. rc = zmq_sendmsg (s, &msg, 0);
  69. if (rc < 0) {
  70. printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno));
  71. exit (1);
  72. }
  73. rc = zmq_msg_close (&msg);
  74. if (rc != 0) {
  75. printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
  76. exit (1);
  77. }
  78. }
  79. rc = zmq_close (s);
  80. if (rc != 0) {
  81. printf ("error in zmq_close: %s\n", zmq_strerror (errno));
  82. exit (1);
  83. }
  84. #if defined ZMQ_HAVE_WINDOWS
  85. return 0;
  86. #else
  87. return NULL;
  88. #endif
  89. }
  90. int main (int argc, char *argv[])
  91. {
  92. #if defined ZMQ_HAVE_WINDOWS
  93. HANDLE local_thread;
  94. #else
  95. pthread_t local_thread;
  96. #endif
  97. void *ctx;
  98. void *s;
  99. int rc;
  100. int i;
  101. zmq_msg_t msg;
  102. void *watch;
  103. unsigned long elapsed;
  104. unsigned long throughput;
  105. double megabits;
  106. if (argc != 3) {
  107. printf ("usage: inproc_thr <message-size> <message-count>\n");
  108. return 1;
  109. }
  110. message_size = atoi (argv[1]);
  111. message_count = atoi (argv[2]);
  112. ctx = zmq_init (1);
  113. if (!ctx) {
  114. printf ("error in zmq_init: %s\n", zmq_strerror (errno));
  115. return -1;
  116. }
  117. s = zmq_socket (ctx, ZMQ_PULL);
  118. if (!s) {
  119. printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
  120. return -1;
  121. }
  122. rc = zmq_bind (s, "inproc://thr_test");
  123. if (rc != 0) {
  124. printf ("error in zmq_bind: %s\n", zmq_strerror (errno));
  125. return -1;
  126. }
  127. #if defined ZMQ_HAVE_WINDOWS
  128. local_thread = (HANDLE) _beginthreadex (NULL, 0, worker, ctx, 0, NULL);
  129. if (local_thread == 0) {
  130. printf ("error in _beginthreadex\n");
  131. return -1;
  132. }
  133. #else
  134. rc = pthread_create (&local_thread, NULL, worker, ctx);
  135. if (rc != 0) {
  136. printf ("error in pthread_create: %s\n", zmq_strerror (rc));
  137. return -1;
  138. }
  139. #endif
  140. rc = zmq_msg_init (&msg);
  141. if (rc != 0) {
  142. printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno));
  143. return -1;
  144. }
  145. printf ("message size: %d [B]\n", (int) message_size);
  146. printf ("message count: %d\n", (int) message_count);
  147. rc = zmq_recvmsg (s, &msg, 0);
  148. if (rc < 0) {
  149. printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
  150. return -1;
  151. }
  152. if (zmq_msg_size (&msg) != message_size) {
  153. printf ("message of incorrect size received\n");
  154. return -1;
  155. }
  156. watch = zmq_stopwatch_start ();
  157. for (i = 0; i != message_count - 1; i++) {
  158. rc = zmq_recvmsg (s, &msg, 0);
  159. if (rc < 0) {
  160. printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
  161. return -1;
  162. }
  163. if (zmq_msg_size (&msg) != message_size) {
  164. printf ("message of incorrect size received\n");
  165. return -1;
  166. }
  167. }
  168. elapsed = zmq_stopwatch_stop (watch);
  169. if (elapsed == 0)
  170. elapsed = 1;
  171. rc = zmq_msg_close (&msg);
  172. if (rc != 0) {
  173. printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
  174. return -1;
  175. }
  176. #if defined ZMQ_HAVE_WINDOWS
  177. DWORD rc2 = WaitForSingleObject (local_thread, INFINITE);
  178. if (rc2 == WAIT_FAILED) {
  179. printf ("error in WaitForSingleObject\n");
  180. return -1;
  181. }
  182. BOOL rc3 = CloseHandle (local_thread);
  183. if (rc3 == 0) {
  184. printf ("error in CloseHandle\n");
  185. return -1;
  186. }
  187. #else
  188. rc = pthread_join (local_thread, NULL);
  189. if (rc != 0) {
  190. printf ("error in pthread_join: %s\n", zmq_strerror (rc));
  191. return -1;
  192. }
  193. #endif
  194. rc = zmq_close (s);
  195. if (rc != 0) {
  196. printf ("error in zmq_close: %s\n", zmq_strerror (errno));
  197. return -1;
  198. }
  199. rc = zmq_ctx_term (ctx);
  200. if (rc != 0) {
  201. printf ("error in zmq_ctx_term: %s\n", zmq_strerror (errno));
  202. return -1;
  203. }
  204. throughput =
  205. (unsigned long) ((double) message_count / (double) elapsed * 1000000);
  206. megabits = (double) (throughput * message_size * 8) / 1000000;
  207. printf ("mean throughput: %d [msg/s]\n", (int) throughput);
  208. printf ("mean throughput: %.3f [Mb/s]\n", (double) megabits);
  209. return 0;
  210. }