signaler.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414
  1. /*
  2. Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
  3. This file is part of libzmq, the ZeroMQ core engine in C++.
  4. libzmq is free software; you can redistribute it and/or modify it under
  5. the terms of the GNU Lesser General Public License (LGPL) as published
  6. by the Free Software Foundation; either version 3 of the License, or
  7. (at your option) any later version.
  8. As a special exception, the Contributors give you permission to link
  9. this library with independent modules to produce an executable,
  10. regardless of the license terms of these independent modules, and to
  11. copy and distribute the resulting executable under terms of your choice,
  12. provided that you also meet, for each linked independent module, the
  13. terms and conditions of the license of that module. An independent
  14. module is a module which is not derived from or based on this library.
  15. If you modify this library, you must extend this exception to your
  16. version of the library.
  17. libzmq is distributed in the hope that it will be useful, but WITHOUT
  18. ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  19. FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
  20. License for more details.
  21. You should have received a copy of the GNU Lesser General Public License
  22. along with this program. If not, see <http://www.gnu.org/licenses/>.
  23. */
  24. #include "precompiled.hpp"
  25. #include "poller.hpp"
  26. #include "polling_util.hpp"
  27. #if defined ZMQ_POLL_BASED_ON_POLL
  28. #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_AIX
  29. #include <poll.h>
  30. #endif
  31. #elif defined ZMQ_POLL_BASED_ON_SELECT
  32. #if defined ZMQ_HAVE_WINDOWS
  33. #elif defined ZMQ_HAVE_HPUX
  34. #include <sys/param.h>
  35. #include <sys/types.h>
  36. #include <sys/time.h>
  37. #elif defined ZMQ_HAVE_OPENVMS
  38. #include <sys/types.h>
  39. #include <sys/time.h>
  40. #elif defined ZMQ_HAVE_VXWORKS
  41. #include <sys/types.h>
  42. #include <sys/time.h>
  43. #include <sockLib.h>
  44. #include <strings.h>
  45. #else
  46. #include <sys/select.h>
  47. #endif
  48. #endif
  49. #include "signaler.hpp"
  50. #include "likely.hpp"
  51. #include "stdint.hpp"
  52. #include "config.hpp"
  53. #include "err.hpp"
  54. #include "fd.hpp"
  55. #include "ip.hpp"
  56. #include "tcp.hpp"
  57. #if !defined ZMQ_HAVE_WINDOWS
  58. #include <unistd.h>
  59. #include <netinet/tcp.h>
  60. #include <sys/types.h>
  61. #include <sys/socket.h>
  62. #endif
  63. #if !defined(ZMQ_HAVE_WINDOWS)
  64. // Helper to sleep for specific number of milliseconds (or until signal)
  65. //
  66. static int sleep_ms (unsigned int ms_)
  67. {
  68. if (ms_ == 0)
  69. return 0;
  70. #if defined ZMQ_HAVE_ANDROID
  71. usleep (ms_ * 1000);
  72. return 0;
  73. #elif defined ZMQ_HAVE_VXWORKS
  74. struct timespec ns_;
  75. ns_.tv_sec = ms_ / 1000;
  76. ns_.tv_nsec = ms_ % 1000 * 1000000;
  77. return nanosleep (&ns_, 0);
  78. #else
  79. return usleep (ms_ * 1000);
  80. #endif
  81. }
  82. // Helper to wait on close(), for non-blocking sockets, until it completes
  83. // If EAGAIN is received, will sleep briefly (1-100ms) then try again, until
  84. // the overall timeout is reached.
  85. //
  86. static int close_wait_ms (int fd_, unsigned int max_ms_ = 2000)
  87. {
  88. unsigned int ms_so_far = 0;
  89. const unsigned int min_step_ms = 1;
  90. const unsigned int max_step_ms = 100;
  91. const unsigned int step_ms =
  92. std::min (std::max (min_step_ms, max_ms_ / 10), max_step_ms);
  93. int rc = 0; // do not sleep on first attempt
  94. do {
  95. if (rc == -1 && errno == EAGAIN) {
  96. sleep_ms (step_ms);
  97. ms_so_far += step_ms;
  98. }
  99. rc = close (fd_);
  100. } while (ms_so_far < max_ms_ && rc == -1 && errno == EAGAIN);
  101. return rc;
  102. }
  103. #endif
  104. zmq::signaler_t::signaler_t ()
  105. {
  106. // Create the socketpair for signaling.
  107. if (make_fdpair (&_r, &_w) == 0) {
  108. unblock_socket (_w);
  109. unblock_socket (_r);
  110. }
  111. #ifdef HAVE_FORK
  112. pid = getpid ();
  113. #endif
  114. }
  115. // This might get run after some part of construction failed, leaving one or
  116. // both of _r and _w retired_fd.
  117. zmq::signaler_t::~signaler_t ()
  118. {
  119. #if defined ZMQ_HAVE_EVENTFD
  120. if (_r == retired_fd)
  121. return;
  122. int rc = close_wait_ms (_r);
  123. errno_assert (rc == 0);
  124. #elif defined ZMQ_HAVE_WINDOWS
  125. if (_w != retired_fd) {
  126. const struct linger so_linger = {1, 0};
  127. int rc = setsockopt (_w, SOL_SOCKET, SO_LINGER,
  128. reinterpret_cast<const char *> (&so_linger),
  129. sizeof so_linger);
  130. // Only check shutdown if WSASTARTUP was previously done
  131. if (rc == 0 || WSAGetLastError () != WSANOTINITIALISED) {
  132. wsa_assert (rc != SOCKET_ERROR);
  133. rc = closesocket (_w);
  134. wsa_assert (rc != SOCKET_ERROR);
  135. if (_r == retired_fd)
  136. return;
  137. rc = closesocket (_r);
  138. wsa_assert (rc != SOCKET_ERROR);
  139. }
  140. }
  141. #else
  142. if (_w != retired_fd) {
  143. int rc = close_wait_ms (_w);
  144. errno_assert (rc == 0);
  145. }
  146. if (_r != retired_fd) {
  147. int rc = close_wait_ms (_r);
  148. errno_assert (rc == 0);
  149. }
  150. #endif
  151. }
  152. zmq::fd_t zmq::signaler_t::get_fd () const
  153. {
  154. return _r;
  155. }
  156. void zmq::signaler_t::send ()
  157. {
  158. #if defined HAVE_FORK
  159. if (unlikely (pid != getpid ())) {
  160. //printf("Child process %d signaler_t::send returning without sending #1\n", getpid());
  161. return; // do not send anything in forked child context
  162. }
  163. #endif
  164. #if defined ZMQ_HAVE_EVENTFD
  165. const uint64_t inc = 1;
  166. ssize_t sz = write (_w, &inc, sizeof (inc));
  167. errno_assert (sz == sizeof (inc));
  168. #elif defined ZMQ_HAVE_WINDOWS
  169. const char dummy = 0;
  170. int nbytes;
  171. do {
  172. nbytes = ::send (_w, &dummy, sizeof (dummy), 0);
  173. wsa_assert (nbytes != SOCKET_ERROR);
  174. // wsa_assert does not abort on WSAEWOULDBLOCK. If we get this, we retry.
  175. } while (nbytes == SOCKET_ERROR);
  176. // Given the small size of dummy (should be 1) expect that send was able to send everything.
  177. zmq_assert (nbytes == sizeof (dummy));
  178. #elif defined ZMQ_HAVE_VXWORKS
  179. unsigned char dummy = 0;
  180. while (true) {
  181. ssize_t nbytes = ::send (_w, (char *) &dummy, sizeof (dummy), 0);
  182. if (unlikely (nbytes == -1 && errno == EINTR))
  183. continue;
  184. #if defined(HAVE_FORK)
  185. if (unlikely (pid != getpid ())) {
  186. //printf("Child process %d signaler_t::send returning without sending #2\n", getpid());
  187. errno = EINTR;
  188. break;
  189. }
  190. #endif
  191. zmq_assert (nbytes == sizeof dummy);
  192. break;
  193. }
  194. #else
  195. unsigned char dummy = 0;
  196. while (true) {
  197. ssize_t nbytes = ::send (_w, &dummy, sizeof (dummy), 0);
  198. if (unlikely (nbytes == -1 && errno == EINTR))
  199. continue;
  200. #if defined(HAVE_FORK)
  201. if (unlikely (pid != getpid ())) {
  202. //printf("Child process %d signaler_t::send returning without sending #2\n", getpid());
  203. errno = EINTR;
  204. break;
  205. }
  206. #endif
  207. zmq_assert (nbytes == sizeof dummy);
  208. break;
  209. }
  210. #endif
  211. }
  212. int zmq::signaler_t::wait (int timeout_) const
  213. {
  214. #ifdef HAVE_FORK
  215. if (unlikely (pid != getpid ())) {
  216. // we have forked and the file descriptor is closed. Emulate an interrupt
  217. // response.
  218. //printf("Child process %d signaler_t::wait returning simulating interrupt #1\n", getpid());
  219. errno = EINTR;
  220. return -1;
  221. }
  222. #endif
  223. #ifdef ZMQ_POLL_BASED_ON_POLL
  224. struct pollfd pfd;
  225. pfd.fd = _r;
  226. pfd.events = POLLIN;
  227. const int rc = poll (&pfd, 1, timeout_);
  228. if (unlikely (rc < 0)) {
  229. errno_assert (errno == EINTR);
  230. return -1;
  231. }
  232. if (unlikely (rc == 0)) {
  233. errno = EAGAIN;
  234. return -1;
  235. }
  236. #ifdef HAVE_FORK
  237. if (unlikely (pid != getpid ())) {
  238. // we have forked and the file descriptor is closed. Emulate an interrupt
  239. // response.
  240. //printf("Child process %d signaler_t::wait returning simulating interrupt #2\n", getpid());
  241. errno = EINTR;
  242. return -1;
  243. }
  244. #endif
  245. zmq_assert (rc == 1);
  246. zmq_assert (pfd.revents & POLLIN);
  247. return 0;
  248. #elif defined ZMQ_POLL_BASED_ON_SELECT
  249. optimized_fd_set_t fds (1);
  250. FD_ZERO (fds.get ());
  251. FD_SET (_r, fds.get ());
  252. struct timeval timeout;
  253. if (timeout_ >= 0) {
  254. timeout.tv_sec = timeout_ / 1000;
  255. timeout.tv_usec = timeout_ % 1000 * 1000;
  256. }
  257. #ifdef ZMQ_HAVE_WINDOWS
  258. int rc =
  259. select (0, fds.get (), NULL, NULL, timeout_ >= 0 ? &timeout : NULL);
  260. wsa_assert (rc != SOCKET_ERROR);
  261. #else
  262. int rc =
  263. select (_r + 1, fds.get (), NULL, NULL, timeout_ >= 0 ? &timeout : NULL);
  264. if (unlikely (rc < 0)) {
  265. errno_assert (errno == EINTR);
  266. return -1;
  267. }
  268. #endif
  269. if (unlikely (rc == 0)) {
  270. errno = EAGAIN;
  271. return -1;
  272. }
  273. zmq_assert (rc == 1);
  274. return 0;
  275. #else
  276. #error
  277. #endif
  278. }
  279. void zmq::signaler_t::recv ()
  280. {
  281. // Attempt to read a signal.
  282. #if defined ZMQ_HAVE_EVENTFD
  283. uint64_t dummy;
  284. ssize_t sz = read (_r, &dummy, sizeof (dummy));
  285. errno_assert (sz == sizeof (dummy));
  286. // If we accidentally grabbed the next signal(s) along with the current
  287. // one, return it back to the eventfd object.
  288. if (unlikely (dummy > 1)) {
  289. const uint64_t inc = dummy - 1;
  290. ssize_t sz2 = write (_w, &inc, sizeof (inc));
  291. errno_assert (sz2 == sizeof (inc));
  292. return;
  293. }
  294. zmq_assert (dummy == 1);
  295. #else
  296. unsigned char dummy;
  297. #if defined ZMQ_HAVE_WINDOWS
  298. const int nbytes =
  299. ::recv (_r, reinterpret_cast<char *> (&dummy), sizeof (dummy), 0);
  300. wsa_assert (nbytes != SOCKET_ERROR);
  301. #elif defined ZMQ_HAVE_VXWORKS
  302. ssize_t nbytes = ::recv (_r, (char *) &dummy, sizeof (dummy), 0);
  303. errno_assert (nbytes >= 0);
  304. #else
  305. ssize_t nbytes = ::recv (_r, &dummy, sizeof (dummy), 0);
  306. errno_assert (nbytes >= 0);
  307. #endif
  308. zmq_assert (nbytes == sizeof (dummy));
  309. zmq_assert (dummy == 0);
  310. #endif
  311. }
  312. int zmq::signaler_t::recv_failable ()
  313. {
  314. // Attempt to read a signal.
  315. #if defined ZMQ_HAVE_EVENTFD
  316. uint64_t dummy;
  317. ssize_t sz = read (_r, &dummy, sizeof (dummy));
  318. if (sz == -1) {
  319. errno_assert (errno == EAGAIN);
  320. return -1;
  321. }
  322. errno_assert (sz == sizeof (dummy));
  323. // If we accidentally grabbed the next signal(s) along with the current
  324. // one, return it back to the eventfd object.
  325. if (unlikely (dummy > 1)) {
  326. const uint64_t inc = dummy - 1;
  327. ssize_t sz2 = write (_w, &inc, sizeof (inc));
  328. errno_assert (sz2 == sizeof (inc));
  329. return 0;
  330. }
  331. zmq_assert (dummy == 1);
  332. #else
  333. unsigned char dummy;
  334. #if defined ZMQ_HAVE_WINDOWS
  335. const int nbytes =
  336. ::recv (_r, reinterpret_cast<char *> (&dummy), sizeof (dummy), 0);
  337. if (nbytes == SOCKET_ERROR) {
  338. const int last_error = WSAGetLastError ();
  339. if (last_error == WSAEWOULDBLOCK) {
  340. errno = EAGAIN;
  341. return -1;
  342. }
  343. wsa_assert (last_error == WSAEWOULDBLOCK);
  344. }
  345. #elif defined ZMQ_HAVE_VXWORKS
  346. ssize_t nbytes = ::recv (_r, (char *) &dummy, sizeof (dummy), 0);
  347. if (nbytes == -1) {
  348. if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
  349. errno = EAGAIN;
  350. return -1;
  351. }
  352. errno_assert (errno == EAGAIN || errno == EWOULDBLOCK
  353. || errno == EINTR);
  354. }
  355. #else
  356. ssize_t nbytes = ::recv (_r, &dummy, sizeof (dummy), 0);
  357. if (nbytes == -1) {
  358. if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
  359. errno = EAGAIN;
  360. return -1;
  361. }
  362. errno_assert (errno == EAGAIN || errno == EWOULDBLOCK
  363. || errno == EINTR);
  364. }
  365. #endif
  366. zmq_assert (nbytes == sizeof (dummy));
  367. zmq_assert (dummy == 0);
  368. #endif
  369. return 0;
  370. }
  371. bool zmq::signaler_t::valid () const
  372. {
  373. return _w != retired_fd;
  374. }
  375. #ifdef HAVE_FORK
  376. void zmq::signaler_t::forked ()
  377. {
  378. // Close file descriptors created in the parent and create new pair
  379. close (_r);
  380. close (_w);
  381. make_fdpair (&_r, &_w);
  382. }
  383. #endif