test_multithread.cpp 6.8 KB


  1. /*
  2. Server thread listen ZMQ_SERVER socket and transfer incoming message
  3. to worker threads by ZMQ_PUSH-ZMQ_PULL
  4. Worker thread receive message and send back to ZMQ_SERVER socket
  5. Each client thread open CLIENT_CONNECTION ZMQ_CLIENT sockets,
  6. send random size message to each socket and check server answer
  7. */
  8. #define ZMQ_BUILD_DRAFT_API
  9. #include "../../../../include/zmq.h"
  10. #pragma comment(lib,"libzmq.lib")
  11. #include <assert.h>
  12. #include <stdlib.h>
  13. #include <thread>
  14. #include <atomic>
  15. #define SERVER_ADDR "tcp://127.0.0.1:12345"
  16. #define SERVER_WORKER_COUNT 3 // worker threads count
  17. #define CLIENT_COUNT 5 // client threads count
  18. #define CLIENT_CONNECTION 100 // ZMQ_CLIENT sockets at each client
  19. #define CLIENT_RECCONECT 1000 // reconnect one socket after messages
  20. #define MESSAGE_MAX_SIZE 1024
  21. //*******************************************************************
  22. //****** MESSAGE ****************************************************
  23. //*******************************************************************
  24. void message_fill(zmq_msg_t* msg, int val) {
  25. assert(val > 0);
  26. int size = sizeof(int) * 2 + val;
  27. int rc = zmq_msg_init_size(msg, size); assert(rc == 0);
  28. uint8_t* data = (uint8_t*)zmq_msg_data(msg);
  29. memcpy(data, &val, sizeof(int));
  30. data += sizeof(int);
  31. memset(data, val & 0xFF, val);
  32. int check_sum = val + (val & 0xFF) * val;
  33. data += val;
  34. memcpy(data, &check_sum, sizeof(int));
  35. }
  36. int message_check(zmq_msg_t* msg) {
  37. uint8_t* data = (uint8_t*)zmq_msg_data(msg);
  38. int size = zmq_msg_size(msg);
  39. assert(size > sizeof(int) * 2);
  40. // check size
  41. int val;
  42. memcpy(&val, data, sizeof(int));
  43. if(size != sizeof(int) * 2 + val) {
  44. fprintf(stderr, "wrong message: val = %d size = %d\n", val, size);
  45. return -1;
  46. }
  47. // check sum
  48. data += sizeof(int);
  49. int cs = val;
  50. for(int i = 0; i < val; i++) {
  51. cs += data[i];
  52. }
  53. data += val;
  54. int check_sum;
  55. memcpy(&check_sum, data, sizeof(int));
  56. if(check_sum != cs) {
  57. fprintf(stderr, "wrong message: cs = %d check_sum = %d\n", cs, check_sum);
  58. return -1;
  59. }
  60. return val;
  61. }
  62. //*******************************************************************
  63. //****** SERVER *****************************************************
  64. //*******************************************************************
  65. void *server_ctx = NULL;
  66. void *server_sock = NULL;
  67. std::atomic<int> worker_cnt[SERVER_WORKER_COUNT] = {0}; // statistic
  68. // worker thread
  69. void worker(int num) {
  70. printf("worker %d start\n", num);
  71. void* queue = zmq_socket(server_ctx, ZMQ_PULL); assert(queue);
  72. int rc = zmq_connect(queue, "inproc://queue"); assert(rc == 0);
  73. while (1) {
  74. // receive messages from the queue
  75. zmq_msg_t msg;
  76. rc = zmq_msg_init(&msg); assert(rc == 0);
  77. rc = zmq_msg_recv(&msg, queue, 0); assert(rc > 0);
  78. // check message
  79. //printf("worker %d recv %d bytes at %X from %X\n", num, zmq_msg_size(&msg), zmq_msg_data(&msg), zmq_msg_routing_id(&msg));
  80. // send to client
  81. rc = zmq_msg_send(&msg, server_sock, 0); assert(rc != -1);
  82. worker_cnt[num]++;
  83. }
  84. zmq_close(queue);
  85. }
  86. // server thread
  87. void server() {
  88. server_ctx = zmq_ctx_new(); assert(server_ctx);
  89. // create queue
  90. void* queue = zmq_socket(server_ctx, ZMQ_PUSH); assert(queue);
  91. int rc = zmq_bind(queue, "inproc://queue"); assert(rc == 0);
  92. // start workers
  93. std::thread w[SERVER_WORKER_COUNT];
  94. for (int i = 0; i < SERVER_WORKER_COUNT; i++) w[i] = std::thread(worker, i);
  95. // ZMQ_SERVER for client messages
  96. server_sock = zmq_socket(server_ctx, ZMQ_SERVER); assert(server_sock);
  97. rc = zmq_bind(server_sock, SERVER_ADDR); assert(rc == 0);
  98. while (1) {
  99. // wait client message
  100. zmq_msg_t msg;
  101. rc = zmq_msg_init(&msg); assert(rc == 0);
  102. rc = zmq_msg_recv(&msg, server_sock, 0); assert(rc > 0);
  103. //printf("recv %d bytes at %X from %X\n", zmq_msg_size(&msg), zmq_msg_data(&msg), zmq_msg_routing_id(&msg));
  104. // send message to queue
  105. rc = zmq_msg_send(&msg, queue, 0); assert(rc > 0);
  106. }
  107. }
  108. //*******************************************************************
  109. //****** CLIENT *****************************************************
  110. //*******************************************************************
  111. std::atomic<int> client_cnt[CLIENT_COUNT] = { 0 }; // statistic
  112. std::atomic<int> client_ready = 0;
  113. // client thread
  114. void client(int num)
  115. {
  116. //printf("client %d start. Open %d connections\n", num, CLIENT_CONNECTION);
  117. void *ctx = zmq_ctx_new(); assert(ctx);
  118. void *sock[CLIENT_CONNECTION];
  119. int rc;
  120. // open ZMQ_CLIENT connections
  121. for (int i = 0; i < CLIENT_CONNECTION; i++) {
  122. sock[i] = zmq_socket(ctx, ZMQ_CLIENT); assert(sock[i]);
  123. rc = zmq_connect(sock[i], SERVER_ADDR); assert(rc == 0);
  124. // test connection
  125. zmq_msg_t msg;
  126. int v = rand() % 256 + 1;
  127. message_fill(&msg, v);
  128. rc = zmq_msg_send(&msg, sock[i], 0); assert(rc > 0);
  129. rc = zmq_msg_init(&msg); assert(rc == 0);
  130. rc = zmq_msg_recv(&msg, sock[i], 0); assert(rc > 0);
  131. rc = message_check(&msg); assert(rc == v);
  132. zmq_msg_close(&msg);
  133. }
  134. printf("client %d open %d connections\n", num, CLIENT_CONNECTION);
  135. client_ready++;
  136. while (client_ready < CLIENT_COUNT) Sleep(10); // wait while all clients open sockets
  137. int recconect = 0;
  138. while(1) {
  139. int val[CLIENT_CONNECTION];
  140. zmq_msg_t msg;
  141. // send messages
  142. for(int i = 0; i < CLIENT_CONNECTION; i++) {
  143. val[i] = rand() % MESSAGE_MAX_SIZE + 1;
  144. message_fill(&msg, val[i]);
  145. rc = zmq_msg_send(&msg, sock[i], 0); assert(rc > 0);
  146. }
  147. // recv and check
  148. for (int i = 0; i < CLIENT_CONNECTION; i++) {
  149. rc = zmq_msg_init(&msg); assert(rc == 0);
  150. rc = zmq_msg_recv(&msg, sock[i], 0); assert(rc > 0);
  151. rc = message_check(&msg);
  152. if(rc != val[i] && rc > 0) {
  153. fprintf(stderr, "wrong message: send %d recv %d \n", val[i], rc);
  154. }
  155. zmq_msg_close(&msg);
  156. client_cnt[num]++;
  157. }
  158. // reconnect one
  159. recconect++;
  160. if(recconect == CLIENT_RECCONECT) {
  161. int n = rand() % CLIENT_CONNECTION;
  162. zmq_close(sock[n]);
  163. sock[n] = zmq_socket(ctx, ZMQ_CLIENT); assert(sock[n]);
  164. int rc = zmq_connect(sock[n], SERVER_ADDR); assert(rc == 0);
  165. }
  166. }
  167. }
  168. //*******************************************************************
  169. int main (void) {
  170. int v1, v2, v3; zmq_version(&v1, &v2, &v3);
  171. printf("ZMQ version %d.%d.%d. Compile %s %s\n", v1, v2, v3, __DATE__, __TIME__);
  172. std::thread ct[CLIENT_COUNT];
  173. for (int i = 0; i < CLIENT_COUNT; i++) ct[i] = std::thread(client, i);
  174. std::thread st(server);
  175. int w[SERVER_WORKER_COUNT] = { 0 };
  176. int c[CLIENT_COUNT] = { 0 };
  177. int total = 0;
  178. while(1) {
  179. Sleep(1000);
  180. if (client_ready < CLIENT_COUNT) continue;
  181. // check workers
  182. for(int i = 0; i < SERVER_WORKER_COUNT; i++) {
  183. if(w[i] == worker_cnt[i]) {
  184. fprintf(stderr, "worker %d not work \n", i);
  185. }
  186. w[i] = worker_cnt[i];
  187. }
  188. // check clients
  189. int t = 0;
  190. for (int i = 0; i < CLIENT_COUNT; i++) {
  191. if (c[i] == client_cnt[i]) {
  192. fprintf(stderr, "client %d not work \n", i);
  193. }
  194. c[i] = client_cnt[i];
  195. t += c[i];
  196. }
  197. printf("\rTotal %d messages. Speed %d per second ", t, t - total);
  198. total = t;
  199. }
  200. return 0;
  201. }