123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229 |
- /*
- Server thread listen ZMQ_SERVER socket and transfer incoming message
- to worker threads by ZMQ_PUSH-ZMQ_PULL
- Worker thread receive message and send back to ZMQ_SERVER socket
- Each client thread open CLIENT_CONNECTION ZMQ_CLIENT sockets,
- send random size message to each socket and check server answer
- */
- #define ZMQ_BUILD_DRAFT_API
- #include "../../../../include/zmq.h"
- #pragma comment(lib,"libzmq.lib")
- #include <assert.h>
- #include <stdlib.h>
- #include <thread>
- #include <atomic>
- #define SERVER_ADDR "tcp://127.0.0.1:12345"
- #define SERVER_WORKER_COUNT 3 // worker threads count
- #define CLIENT_COUNT 5 // client threads count
- #define CLIENT_CONNECTION 100 // ZMQ_CLIENT sockets at each client
- #define CLIENT_RECCONECT 1000 // reconnect one socket after messages
- #define MESSAGE_MAX_SIZE 1024
- //*******************************************************************
- //****** MESSAGE ****************************************************
- //*******************************************************************
- void message_fill(zmq_msg_t* msg, int val) {
- assert(val > 0);
- int size = sizeof(int) * 2 + val;
- int rc = zmq_msg_init_size(msg, size); assert(rc == 0);
- uint8_t* data = (uint8_t*)zmq_msg_data(msg);
- memcpy(data, &val, sizeof(int));
- data += sizeof(int);
- memset(data, val & 0xFF, val);
- int check_sum = val + (val & 0xFF) * val;
- data += val;
- memcpy(data, &check_sum, sizeof(int));
- }
- int message_check(zmq_msg_t* msg) {
- uint8_t* data = (uint8_t*)zmq_msg_data(msg);
- int size = zmq_msg_size(msg);
- assert(size > sizeof(int) * 2);
- // check size
- int val;
- memcpy(&val, data, sizeof(int));
- if(size != sizeof(int) * 2 + val) {
- fprintf(stderr, "wrong message: val = %d size = %d\n", val, size);
- return -1;
- }
- // check sum
- data += sizeof(int);
- int cs = val;
- for(int i = 0; i < val; i++) {
- cs += data[i];
- }
- data += val;
- int check_sum;
- memcpy(&check_sum, data, sizeof(int));
- if(check_sum != cs) {
- fprintf(stderr, "wrong message: cs = %d check_sum = %d\n", cs, check_sum);
- return -1;
- }
- return val;
- }
- //*******************************************************************
- //****** SERVER *****************************************************
- //*******************************************************************
- void *server_ctx = NULL;
- void *server_sock = NULL;
- std::atomic<int> worker_cnt[SERVER_WORKER_COUNT] = {0}; // statistic
- // worker thread
- void worker(int num) {
- printf("worker %d start\n", num);
- void* queue = zmq_socket(server_ctx, ZMQ_PULL); assert(queue);
- int rc = zmq_connect(queue, "inproc://queue"); assert(rc == 0);
- while (1) {
- // receive messages from the queue
- zmq_msg_t msg;
- rc = zmq_msg_init(&msg); assert(rc == 0);
- rc = zmq_msg_recv(&msg, queue, 0); assert(rc > 0);
- // check message
- //printf("worker %d recv %d bytes at %X from %X\n", num, zmq_msg_size(&msg), zmq_msg_data(&msg), zmq_msg_routing_id(&msg));
- // send to client
- rc = zmq_msg_send(&msg, server_sock, 0); assert(rc != -1);
- worker_cnt[num]++;
- }
- zmq_close(queue);
- }
- // server thread
- void server() {
- server_ctx = zmq_ctx_new(); assert(server_ctx);
- // create queue
- void* queue = zmq_socket(server_ctx, ZMQ_PUSH); assert(queue);
- int rc = zmq_bind(queue, "inproc://queue"); assert(rc == 0);
- // start workers
- std::thread w[SERVER_WORKER_COUNT];
- for (int i = 0; i < SERVER_WORKER_COUNT; i++) w[i] = std::thread(worker, i);
- // ZMQ_SERVER for client messages
- server_sock = zmq_socket(server_ctx, ZMQ_SERVER); assert(server_sock);
- rc = zmq_bind(server_sock, SERVER_ADDR); assert(rc == 0);
- while (1) {
- // wait client message
- zmq_msg_t msg;
- rc = zmq_msg_init(&msg); assert(rc == 0);
- rc = zmq_msg_recv(&msg, server_sock, 0); assert(rc > 0);
- //printf("recv %d bytes at %X from %X\n", zmq_msg_size(&msg), zmq_msg_data(&msg), zmq_msg_routing_id(&msg));
- // send message to queue
- rc = zmq_msg_send(&msg, queue, 0); assert(rc > 0);
- }
- }
- //*******************************************************************
- //****** CLIENT *****************************************************
- //*******************************************************************
- std::atomic<int> client_cnt[CLIENT_COUNT] = { 0 }; // statistic
- std::atomic<int> client_ready = 0;
- // client thread
- void client(int num)
- {
- //printf("client %d start. Open %d connections\n", num, CLIENT_CONNECTION);
- void *ctx = zmq_ctx_new(); assert(ctx);
- void *sock[CLIENT_CONNECTION];
- int rc;
- // open ZMQ_CLIENT connections
- for (int i = 0; i < CLIENT_CONNECTION; i++) {
- sock[i] = zmq_socket(ctx, ZMQ_CLIENT); assert(sock[i]);
- rc = zmq_connect(sock[i], SERVER_ADDR); assert(rc == 0);
- // test connection
- zmq_msg_t msg;
- int v = rand() % 256 + 1;
- message_fill(&msg, v);
- rc = zmq_msg_send(&msg, sock[i], 0); assert(rc > 0);
- rc = zmq_msg_init(&msg); assert(rc == 0);
- rc = zmq_msg_recv(&msg, sock[i], 0); assert(rc > 0);
- rc = message_check(&msg); assert(rc == v);
- zmq_msg_close(&msg);
- }
- printf("client %d open %d connections\n", num, CLIENT_CONNECTION);
- client_ready++;
- while (client_ready < CLIENT_COUNT) Sleep(10); // wait while all clients open sockets
- int recconect = 0;
- while(1) {
- int val[CLIENT_CONNECTION];
- zmq_msg_t msg;
- // send messages
- for(int i = 0; i < CLIENT_CONNECTION; i++) {
- val[i] = rand() % MESSAGE_MAX_SIZE + 1;
- message_fill(&msg, val[i]);
- rc = zmq_msg_send(&msg, sock[i], 0); assert(rc > 0);
- }
- // recv and check
- for (int i = 0; i < CLIENT_CONNECTION; i++) {
- rc = zmq_msg_init(&msg); assert(rc == 0);
- rc = zmq_msg_recv(&msg, sock[i], 0); assert(rc > 0);
- rc = message_check(&msg);
- if(rc != val[i] && rc > 0) {
- fprintf(stderr, "wrong message: send %d recv %d \n", val[i], rc);
- }
- zmq_msg_close(&msg);
- client_cnt[num]++;
- }
- // reconnect one
- recconect++;
- if(recconect == CLIENT_RECCONECT) {
- int n = rand() % CLIENT_CONNECTION;
- zmq_close(sock[n]);
- sock[n] = zmq_socket(ctx, ZMQ_CLIENT); assert(sock[n]);
- int rc = zmq_connect(sock[n], SERVER_ADDR); assert(rc == 0);
- }
- }
- }
- //*******************************************************************
- int main (void) {
- int v1, v2, v3; zmq_version(&v1, &v2, &v3);
- printf("ZMQ version %d.%d.%d. Compile %s %s\n", v1, v2, v3, __DATE__, __TIME__);
- std::thread ct[CLIENT_COUNT];
- for (int i = 0; i < CLIENT_COUNT; i++) ct[i] = std::thread(client, i);
- std::thread st(server);
- int w[SERVER_WORKER_COUNT] = { 0 };
- int c[CLIENT_COUNT] = { 0 };
- int total = 0;
- while(1) {
- Sleep(1000);
- if (client_ready < CLIENT_COUNT) continue;
- // check workers
- for(int i = 0; i < SERVER_WORKER_COUNT; i++) {
- if(w[i] == worker_cnt[i]) {
- fprintf(stderr, "worker %d not work \n", i);
- }
- w[i] = worker_cnt[i];
- }
- // check clients
- int t = 0;
- for (int i = 0; i < CLIENT_COUNT; i++) {
- if (c[i] == client_cnt[i]) {
- fprintf(stderr, "client %d not work \n", i);
- }
- c[i] = client_cnt[i];
- t += c[i];
- }
- printf("\rTotal %d messages. Speed %d per second ", t, t - total);
- total = t;
- }
- return 0;
- }
|