zhelpers.hpp 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. #ifndef __ZHELPERS_HPP_INCLUDED__
  2. #define __ZHELPERS_HPP_INCLUDED__
  3. // Include a bunch of headers that we will need in the examples
  4. #include <zmq.hpp> // https://github.com/zeromq/cppzmq
  5. #include <iostream>
  6. #include <iomanip>
  7. #include <string>
  8. #include <sstream>
  9. #include <time.h>
  10. #include <assert.h>
  11. #include <stdlib.h> // random() RAND_MAX
  12. #include <stdio.h>
  13. #include <stdarg.h>
  14. #include <signal.h>
  15. #if (!defined(WIN32))
  16. # include <sys/time.h>
  17. # include <unistd.h>
  18. #endif
  19. // Bring Windows MSVC up to C99 scratch
  20. #if (defined (WIN32))
  21. typedef unsigned long ulong;
  22. typedef unsigned int uint;
  23. typedef __int64 int64_t;
  24. #endif
  25. // On some version of Windows, POSIX subsystem is not installed by default.
  26. // So define srandom and random ourself.
  27. //
  28. #if (defined (WIN32))
  29. # define srandom srand
  30. # define random rand
  31. #endif
  32. // Visual Studio versions below 2015 do not support sprintf properly. This is a workaround.
  33. // Taken from http://stackoverflow.com/questions/2915672/snprintf-and-visual-studio-2010
  34. #if defined(_MSC_VER) && _MSC_VER < 1900
  35. #define snprintf c99_snprintf
  36. #define vsnprintf c99_vsnprintf
  37. inline int c99_vsnprintf(char *outBuf, size_t size, const char *format, va_list ap)
  38. {
  39. int count = -1;
  40. if (size != 0)
  41. count = _vsnprintf_s(outBuf, size, _TRUNCATE, format, ap);
  42. if (count == -1)
  43. count = _vscprintf(format, ap);
  44. return count;
  45. }
  46. inline int c99_snprintf(char *outBuf, size_t size, const char *format, ...)
  47. {
  48. int count;
  49. va_list ap;
  50. va_start(ap, format);
  51. count = c99_vsnprintf(outBuf, size, format, ap);
  52. va_end(ap);
  53. return count;
  54. }
  55. #endif
  56. // Provide random number from 0..(num-1)
  57. #define within(num) (int) ((float)((num) * random ()) / (RAND_MAX + 1.0))
  58. // Receive 0MQ string from socket and convert into C string
  59. // Caller must free returned string.
  60. inline static char *
  61. s_recv(void *socket, int flags = 0) {
  62. zmq_msg_t message;
  63. zmq_msg_init(&message);
  64. int rc = zmq_msg_recv(&message, socket, flags);
  65. if (rc < 0)
  66. return nullptr; // Context terminated, exit
  67. size_t size = zmq_msg_size(&message);
  68. char *string = (char*)malloc(size + 1);
  69. memcpy(string, zmq_msg_data(&message), size);
  70. zmq_msg_close(&message);
  71. string[size] = 0;
  72. return (string);
  73. }
  74. // Receive 0MQ string from socket and convert into string
  75. inline static std::string
  76. s_recv (zmq::socket_t & socket, int flags = 0) {
  77. zmq::message_t message;
  78. socket.recv(&message, flags);
  79. return std::string(static_cast<char*>(message.data()), message.size());
  80. }
  81. inline static bool s_recv(zmq::socket_t & socket, std::string & ostring, int flags = 0)
  82. {
  83. zmq::message_t message;
  84. bool rc = socket.recv(&message, flags);
  85. if (rc) {
  86. ostring = std::string(static_cast<char*>(message.data()), message.size());
  87. }
  88. return (rc);
  89. }
  90. // Convert C string to 0MQ string and send to socket
  91. inline static int
  92. s_send(void *socket, const char *string, int flags = 0) {
  93. int rc;
  94. zmq_msg_t message;
  95. zmq_msg_init_size(&message, strlen(string));
  96. memcpy(zmq_msg_data(&message), string, strlen(string));
  97. rc = zmq_msg_send(&message, socket, flags);
  98. assert(-1 != rc);
  99. zmq_msg_close(&message);
  100. return (rc);
  101. }
  102. // Convert string to 0MQ string and send to socket
  103. inline static bool
  104. s_send (zmq::socket_t & socket, const std::string & string, int flags = 0) {
  105. zmq::message_t message(string.size());
  106. memcpy (message.data(), string.data(), string.size());
  107. bool rc = socket.send (message, flags);
  108. return (rc);
  109. }
  110. // Sends string as 0MQ string, as multipart non-terminal
  111. inline static int
  112. s_sendmore(void *socket, char *string) {
  113. int rc;
  114. zmq_msg_t message;
  115. zmq_msg_init_size(&message, strlen(string));
  116. memcpy(zmq_msg_data(&message), string, strlen(string));
  117. //rc = zmq_send(socket, string, strlen(string), ZMQ_SNDMORE);
  118. rc = zmq_msg_send(&message, socket, ZMQ_SNDMORE);
  119. assert(-1 != rc);
  120. zmq_msg_close(&message);
  121. return (rc);
  122. }
  123. // Sends string as 0MQ string, as multipart non-terminal
  124. inline static bool
  125. s_sendmore (zmq::socket_t & socket, const std::string & string) {
  126. zmq::message_t message(string.size());
  127. memcpy (message.data(), string.data(), string.size());
  128. bool rc = socket.send (message, ZMQ_SNDMORE);
  129. return (rc);
  130. }
  131. // Receives all message parts from socket, prints neatly
  132. //
  133. inline static void
  134. s_dump (zmq::socket_t & socket)
  135. {
  136. std::cout << "----------------------------------------" << std::endl;
  137. while (1) {
  138. // Process all parts of the message
  139. zmq::message_t message;
  140. socket.recv(&message);
  141. // Dump the message as text or binary
  142. size_t size = message.size();
  143. std::string data(static_cast<char*>(message.data()), size);
  144. bool is_text = true;
  145. size_t char_nbr;
  146. unsigned char byte;
  147. for (char_nbr = 0; char_nbr < size; char_nbr++) {
  148. byte = data [char_nbr];
  149. if (byte < 32 || byte > 127)
  150. is_text = false;
  151. }
  152. std::cout << "[" << std::setfill('0') << std::setw(3) << size << "]";
  153. for (char_nbr = 0; char_nbr < size; char_nbr++) {
  154. if (is_text)
  155. std::cout << (char)data [char_nbr];
  156. else
  157. std::cout << std::setfill('0') << std::setw(2)
  158. << std::hex << (unsigned int) data [char_nbr];
  159. }
  160. std::cout << std::endl;
  161. int more = 0; // Multipart detection
  162. size_t more_size = sizeof (more);
  163. socket.getsockopt (ZMQ_RCVMORE, &more, &more_size);
  164. if (!more)
  165. break; // Last message part
  166. }
  167. }
  168. #if (!defined (WIN32))
  169. // Set simple random printable identity on socket
  170. // Caution:
  171. // DO NOT call this version of s_set_id from multiple threads on MS Windows
  172. // since s_set_id will call rand() on MS Windows. rand(), however, is not
  173. // reentrant or thread-safe. See issue #521.
  174. inline std::string
  175. s_set_id (zmq::socket_t & socket)
  176. {
  177. std::stringstream ss;
  178. ss << std::hex << std::uppercase
  179. << std::setw(4) << std::setfill('0') << within (0x10000) << "-"
  180. << std::setw(4) << std::setfill('0') << within (0x10000);
  181. socket.setsockopt(ZMQ_IDENTITY, ss.str().c_str(), ss.str().length());
  182. return ss.str();
  183. }
  184. #else
  185. // Fix #521
  186. inline std::string
  187. s_set_id(zmq::socket_t & socket, intptr_t id)
  188. {
  189. std::stringstream ss;
  190. ss << std::hex << std::uppercase
  191. << std::setw(4) << std::setfill('0') << id;
  192. socket.setsockopt(ZMQ_IDENTITY, ss.str().c_str(), ss.str().length());
  193. return ss.str();
  194. }
  195. #endif
  196. // Report 0MQ version number
  197. //
  198. inline static void
  199. s_version (void)
  200. {
  201. int major, minor, patch;
  202. zmq_version (&major, &minor, &patch);
  203. std::cout << "Current 0MQ version is " << major << "." << minor << "." << patch << std::endl;
  204. }
  205. inline static void
  206. s_version_assert (int want_major, int want_minor)
  207. {
  208. int major, minor, patch;
  209. zmq_version (&major, &minor, &patch);
  210. if (major < want_major
  211. || (major == want_major && minor < want_minor)) {
  212. std::cout << "Current 0MQ version is " << major << "." << minor << std::endl;
  213. std::cout << "Application needs at least " << want_major << "." << want_minor
  214. << " - cannot continue" << std::endl;
  215. exit (EXIT_FAILURE);
  216. }
  217. }
  218. // Return current system clock as milliseconds
  219. inline static int64_t
  220. s_clock (void)
  221. {
  222. #if (defined (WIN32))
  223. FILETIME fileTime;
  224. GetSystemTimeAsFileTime(&fileTime);
  225. unsigned __int64 largeInt = fileTime.dwHighDateTime;
  226. largeInt <<= 32;
  227. largeInt |= fileTime.dwLowDateTime;
  228. largeInt /= 10000; // FILETIME is in units of 100 nanoseconds
  229. return (int64_t)largeInt;
  230. #else
  231. struct timeval tv;
  232. gettimeofday (&tv, NULL);
  233. return (int64_t) (tv.tv_sec * 1000 + tv.tv_usec / 1000);
  234. #endif
  235. }
  236. // Sleep for a number of milliseconds
  237. inline static void
  238. s_sleep (int msecs)
  239. {
  240. #if (defined (WIN32))
  241. Sleep (msecs);
  242. #else
  243. struct timespec t;
  244. t.tv_sec = msecs / 1000;
  245. t.tv_nsec = (msecs % 1000) * 1000000;
  246. nanosleep (&t, NULL);
  247. #endif
  248. }
  249. inline static void
  250. s_console (const char *format, ...)
  251. {
  252. time_t curtime = time (NULL);
  253. struct tm *loctime = localtime (&curtime);
  254. char *formatted = new char[20];
  255. strftime (formatted, 20, "%y-%m-%d %H:%M:%S ", loctime);
  256. printf ("%s", formatted);
  257. delete[] formatted;
  258. va_list argptr;
  259. va_start (argptr, format);
  260. vprintf (format, argptr);
  261. va_end (argptr);
  262. printf ("\n");
  263. }
  264. // ---------------------------------------------------------------------
  265. // Signal handling
  266. //
  267. // Call s_catch_signals() in your application at startup, and then exit
  268. // your main loop if s_interrupted is ever 1. Works especially well with
  269. // zmq_poll.
  270. static int s_interrupted = 0;
  271. inline static void s_signal_handler (int signal_value)
  272. {
  273. s_interrupted = 1;
  274. }
  275. inline static void s_catch_signals ()
  276. {
  277. #if (!defined(WIN32))
  278. struct sigaction action;
  279. action.sa_handler = s_signal_handler;
  280. action.sa_flags = 0;
  281. sigemptyset (&action.sa_mask);
  282. sigaction (SIGINT, &action, NULL);
  283. sigaction (SIGTERM, &action, NULL);
  284. #endif
  285. }
  286. #endif