socket_base.cpp 68 KB


  1. /*
  2. Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
  3. This file is part of libzmq, the ZeroMQ core engine in C++.
  4. libzmq is free software; you can redistribute it and/or modify it under
  5. the terms of the GNU Lesser General Public License (LGPL) as published
  6. by the Free Software Foundation; either version 3 of the License, or
  7. (at your option) any later version.
  8. As a special exception, the Contributors give you permission to link
  9. this library with independent modules to produce an executable,
  10. regardless of the license terms of these independent modules, and to
  11. copy and distribute the resulting executable under terms of your choice,
  12. provided that you also meet, for each linked independent module, the
  13. terms and conditions of the license of that module. An independent
  14. module is a module which is not derived from or based on this library.
  15. If you modify this library, you must extend this exception to your
  16. version of the library.
  17. libzmq is distributed in the hope that it will be useful, but WITHOUT
  18. ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  19. FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
  20. License for more details.
  21. You should have received a copy of the GNU Lesser General Public License
  22. along with this program. If not, see <http://www.gnu.org/licenses/>.
  23. */
  24. #include "precompiled.hpp"
  25. #include <new>
  26. #include <string>
  27. #include <algorithm>
  28. #include <limits>
  29. #include "macros.hpp"
  30. #if defined ZMQ_HAVE_WINDOWS
  31. #if defined _MSC_VER
  32. #if defined _WIN32_WCE
  33. #include <cmnintrin.h>
  34. #else
  35. #include <intrin.h>
  36. #endif
  37. #endif
  38. #else
  39. #include <unistd.h>
  40. #include <ctype.h>
  41. #endif
  42. #include "socket_base.hpp"
  43. #include "tcp_listener.hpp"
  44. #include "ws_listener.hpp"
  45. #include "ipc_listener.hpp"
  46. #include "tipc_listener.hpp"
  47. #include "tcp_connecter.hpp"
  48. #ifdef ZMQ_HAVE_WS
  49. #include "ws_address.hpp"
  50. #endif
  51. #include "io_thread.hpp"
  52. #include "session_base.hpp"
  53. #include "config.hpp"
  54. #include "pipe.hpp"
  55. #include "err.hpp"
  56. #include "ctx.hpp"
  57. #include "likely.hpp"
  58. #include "msg.hpp"
  59. #include "address.hpp"
  60. #include "ipc_address.hpp"
  61. #include "tcp_address.hpp"
  62. #include "udp_address.hpp"
  63. #include "tipc_address.hpp"
  64. #include "mailbox.hpp"
  65. #include "mailbox_safe.hpp"
  66. #ifdef ZMQ_HAVE_WSS
  67. #include "wss_address.hpp"
  68. #endif
  69. #if defined ZMQ_HAVE_VMCI
  70. #include "vmci_address.hpp"
  71. #include "vmci_listener.hpp"
  72. #endif
  73. #ifdef ZMQ_HAVE_OPENPGM
  74. #include "pgm_socket.hpp"
  75. #endif
  76. #include "pair.hpp"
  77. #include "pub.hpp"
  78. #include "sub.hpp"
  79. #include "req.hpp"
  80. #include "rep.hpp"
  81. #include "pull.hpp"
  82. #include "push.hpp"
  83. #include "dealer.hpp"
  84. #include "router.hpp"
  85. #include "xpub.hpp"
  86. #include "xsub.hpp"
  87. #include "stream.hpp"
  88. #include "server.hpp"
  89. #include "client.hpp"
  90. #include "radio.hpp"
  91. #include "dish.hpp"
  92. #include "gather.hpp"
  93. #include "scatter.hpp"
  94. #include "dgram.hpp"
  95. #include "peer.hpp"
  96. #include "channel.hpp"
  97. void zmq::socket_base_t::inprocs_t::emplace (const char *endpoint_uri_,
  98. pipe_t *pipe_)
  99. {
  100. _inprocs.ZMQ_MAP_INSERT_OR_EMPLACE (std::string (endpoint_uri_), pipe_);
  101. }
  102. int zmq::socket_base_t::inprocs_t::erase_pipes (
  103. const std::string &endpoint_uri_str_)
  104. {
  105. const std::pair<map_t::iterator, map_t::iterator> range =
  106. _inprocs.equal_range (endpoint_uri_str_);
  107. if (range.first == range.second) {
  108. errno = ENOENT;
  109. return -1;
  110. }
  111. for (map_t::iterator it = range.first; it != range.second; ++it) {
  112. it->second->send_disconnect_msg ();
  113. it->second->terminate (true);
  114. }
  115. _inprocs.erase (range.first, range.second);
  116. return 0;
  117. }
  118. void zmq::socket_base_t::inprocs_t::erase_pipe (const pipe_t *pipe_)
  119. {
  120. for (map_t::iterator it = _inprocs.begin (), end = _inprocs.end ();
  121. it != end; ++it)
  122. if (it->second == pipe_) {
  123. _inprocs.erase (it);
  124. break;
  125. }
  126. }
  127. bool zmq::socket_base_t::check_tag () const
  128. {
  129. return _tag == 0xbaddecaf;
  130. }
  131. bool zmq::socket_base_t::is_thread_safe () const
  132. {
  133. return _thread_safe;
  134. }
  135. zmq::socket_base_t *zmq::socket_base_t::create (int type_,
  136. class ctx_t *parent_,
  137. uint32_t tid_,
  138. int sid_)
  139. {
  140. socket_base_t *s = NULL;
  141. switch (type_) {
  142. case ZMQ_PAIR:
  143. s = new (std::nothrow) pair_t (parent_, tid_, sid_);
  144. break;
  145. case ZMQ_PUB:
  146. s = new (std::nothrow) pub_t (parent_, tid_, sid_);
  147. break;
  148. case ZMQ_SUB:
  149. s = new (std::nothrow) sub_t (parent_, tid_, sid_);
  150. break;
  151. case ZMQ_REQ:
  152. s = new (std::nothrow) req_t (parent_, tid_, sid_);
  153. break;
  154. case ZMQ_REP:
  155. s = new (std::nothrow) rep_t (parent_, tid_, sid_);
  156. break;
  157. case ZMQ_DEALER:
  158. s = new (std::nothrow) dealer_t (parent_, tid_, sid_);
  159. break;
  160. case ZMQ_ROUTER:
  161. s = new (std::nothrow) router_t (parent_, tid_, sid_);
  162. break;
  163. case ZMQ_PULL:
  164. s = new (std::nothrow) pull_t (parent_, tid_, sid_);
  165. break;
  166. case ZMQ_PUSH:
  167. s = new (std::nothrow) push_t (parent_, tid_, sid_);
  168. break;
  169. case ZMQ_XPUB:
  170. s = new (std::nothrow) xpub_t (parent_, tid_, sid_);
  171. break;
  172. case ZMQ_XSUB:
  173. s = new (std::nothrow) xsub_t (parent_, tid_, sid_);
  174. break;
  175. case ZMQ_STREAM:
  176. s = new (std::nothrow) stream_t (parent_, tid_, sid_);
  177. break;
  178. case ZMQ_SERVER:
  179. s = new (std::nothrow) server_t (parent_, tid_, sid_);
  180. break;
  181. case ZMQ_CLIENT:
  182. s = new (std::nothrow) client_t (parent_, tid_, sid_);
  183. break;
  184. case ZMQ_RADIO:
  185. s = new (std::nothrow) radio_t (parent_, tid_, sid_);
  186. break;
  187. case ZMQ_DISH:
  188. s = new (std::nothrow) dish_t (parent_, tid_, sid_);
  189. break;
  190. case ZMQ_GATHER:
  191. s = new (std::nothrow) gather_t (parent_, tid_, sid_);
  192. break;
  193. case ZMQ_SCATTER:
  194. s = new (std::nothrow) scatter_t (parent_, tid_, sid_);
  195. break;
  196. case ZMQ_DGRAM:
  197. s = new (std::nothrow) dgram_t (parent_, tid_, sid_);
  198. break;
  199. case ZMQ_PEER:
  200. s = new (std::nothrow) peer_t (parent_, tid_, sid_);
  201. break;
  202. case ZMQ_CHANNEL:
  203. s = new (std::nothrow) channel_t (parent_, tid_, sid_);
  204. break;
  205. default:
  206. errno = EINVAL;
  207. return NULL;
  208. }
  209. alloc_assert (s);
  210. if (s->_mailbox == NULL) {
  211. s->_destroyed = true;
  212. LIBZMQ_DELETE (s);
  213. return NULL;
  214. }
  215. return s;
  216. }
  217. zmq::socket_base_t::socket_base_t (ctx_t *parent_,
  218. uint32_t tid_,
  219. int sid_,
  220. bool thread_safe_) :
  221. own_t (parent_, tid_),
  222. _sync (),
  223. _tag (0xbaddecaf),
  224. _ctx_terminated (false),
  225. _destroyed (false),
  226. _poller (NULL),
  227. _handle (static_cast<poller_t::handle_t> (NULL)),
  228. _last_tsc (0),
  229. _ticks (0),
  230. _rcvmore (false),
  231. _monitor_socket (NULL),
  232. _monitor_events (0),
  233. _thread_safe (thread_safe_),
  234. _reaper_signaler (NULL),
  235. _monitor_sync ()
  236. {
  237. options.socket_id = sid_;
  238. options.ipv6 = (parent_->get (ZMQ_IPV6) != 0);
  239. options.linger.store (parent_->get (ZMQ_BLOCKY) ? -1 : 0);
  240. options.zero_copy = parent_->get (ZMQ_ZERO_COPY_RECV) != 0;
  241. if (_thread_safe) {
  242. _mailbox = new (std::nothrow) mailbox_safe_t (&_sync);
  243. zmq_assert (_mailbox);
  244. } else {
  245. mailbox_t *m = new (std::nothrow) mailbox_t ();
  246. zmq_assert (m);
  247. if (m->get_fd () != retired_fd)
  248. _mailbox = m;
  249. else {
  250. LIBZMQ_DELETE (m);
  251. _mailbox = NULL;
  252. }
  253. }
  254. }
  255. int zmq::socket_base_t::get_peer_state (const void *routing_id_,
  256. size_t routing_id_size_) const
  257. {
  258. LIBZMQ_UNUSED (routing_id_);
  259. LIBZMQ_UNUSED (routing_id_size_);
  260. // Only ROUTER sockets support this
  261. errno = ENOTSUP;
  262. return -1;
  263. }
  264. zmq::socket_base_t::~socket_base_t ()
  265. {
  266. if (_mailbox)
  267. LIBZMQ_DELETE (_mailbox);
  268. if (_reaper_signaler)
  269. LIBZMQ_DELETE (_reaper_signaler);
  270. scoped_lock_t lock (_monitor_sync);
  271. stop_monitor ();
  272. zmq_assert (_destroyed);
  273. }
  274. zmq::i_mailbox *zmq::socket_base_t::get_mailbox () const
  275. {
  276. return _mailbox;
  277. }
  278. void zmq::socket_base_t::stop ()
  279. {
  280. // Called by ctx when it is terminated (zmq_ctx_term).
  281. // 'stop' command is sent from the threads that called zmq_ctx_term to
  282. // the thread owning the socket. This way, blocking call in the
  283. // owner thread can be interrupted.
  284. send_stop ();
  285. }
  286. // TODO consider renaming protocol_ to scheme_ in conformance with RFC 3986
  287. // terminology, but this requires extensive changes to be consistent
  288. int zmq::socket_base_t::parse_uri (const char *uri_,
  289. std::string &protocol_,
  290. std::string &path_)
  291. {
  292. zmq_assert (uri_ != NULL);
  293. const std::string uri (uri_);
  294. const std::string::size_type pos = uri.find ("://");
  295. if (pos == std::string::npos) {
  296. errno = EINVAL;
  297. return -1;
  298. }
  299. protocol_ = uri.substr (0, pos);
  300. path_ = uri.substr (pos + 3);
  301. if (protocol_.empty () || path_.empty ()) {
  302. errno = EINVAL;
  303. return -1;
  304. }
  305. return 0;
  306. }
  307. int zmq::socket_base_t::check_protocol (const std::string &protocol_) const
  308. {
  309. // First check out whether the protocol is something we are aware of.
  310. if (protocol_ != protocol_name::inproc
  311. #if defined ZMQ_HAVE_IPC
  312. && protocol_ != protocol_name::ipc
  313. #endif
  314. && protocol_ != protocol_name::tcp
  315. #ifdef ZMQ_HAVE_WS
  316. && protocol_ != protocol_name::ws
  317. #endif
  318. #ifdef ZMQ_HAVE_WSS
  319. && protocol_ != protocol_name::wss
  320. #endif
  321. #if defined ZMQ_HAVE_OPENPGM
  322. // pgm/epgm transports only available if 0MQ is compiled with OpenPGM.
  323. && protocol_ != protocol_name::pgm
  324. && protocol_ != protocol_name::epgm
  325. #endif
  326. #if defined ZMQ_HAVE_TIPC
  327. // TIPC transport is only available on Linux.
  328. && protocol_ != protocol_name::tipc
  329. #endif
  330. #if defined ZMQ_HAVE_NORM
  331. && protocol_ != protocol_name::norm
  332. #endif
  333. #if defined ZMQ_HAVE_VMCI
  334. && protocol_ != protocol_name::vmci
  335. #endif
  336. && protocol_ != protocol_name::udp) {
  337. errno = EPROTONOSUPPORT;
  338. return -1;
  339. }
  340. // Check whether socket type and transport protocol match.
  341. // Specifically, multicast protocols can't be combined with
  342. // bi-directional messaging patterns (socket types).
  343. #if defined ZMQ_HAVE_OPENPGM || defined ZMQ_HAVE_NORM
  344. #if defined ZMQ_HAVE_OPENPGM && defined ZMQ_HAVE_NORM
  345. if ((protocol_ == protocol_name::pgm || protocol_ == protocol_name::epgm
  346. || protocol_ == protocol_name::norm)
  347. #elif defined ZMQ_HAVE_OPENPGM
  348. if ((protocol_ == protocol_name::pgm || protocol_ == protocol_name::epgm)
  349. #else // defined ZMQ_HAVE_NORM
  350. if (protocol_ == protocol_name::norm
  351. #endif
  352. && options.type != ZMQ_PUB && options.type != ZMQ_SUB
  353. && options.type != ZMQ_XPUB && options.type != ZMQ_XSUB) {
  354. errno = ENOCOMPATPROTO;
  355. return -1;
  356. }
  357. #endif
  358. if (protocol_ == protocol_name::udp
  359. && (options.type != ZMQ_DISH && options.type != ZMQ_RADIO
  360. && options.type != ZMQ_DGRAM)) {
  361. errno = ENOCOMPATPROTO;
  362. return -1;
  363. }
  364. // Protocol is available.
  365. return 0;
  366. }
  367. void zmq::socket_base_t::attach_pipe (pipe_t *pipe_,
  368. bool subscribe_to_all_,
  369. bool locally_initiated_)
  370. {
  371. // First, register the pipe so that we can terminate it later on.
  372. pipe_->set_event_sink (this);
  373. _pipes.push_back (pipe_);
  374. // Let the derived socket type know about new pipe.
  375. xattach_pipe (pipe_, subscribe_to_all_, locally_initiated_);
  376. // If the socket is already being closed, ask any new pipes to terminate
  377. // straight away.
  378. if (is_terminating ()) {
  379. register_term_acks (1);
  380. pipe_->terminate (false);
  381. }
  382. }
  383. int zmq::socket_base_t::setsockopt (int option_,
  384. const void *optval_,
  385. size_t optvallen_)
  386. {
  387. scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
  388. if (unlikely (_ctx_terminated)) {
  389. errno = ETERM;
  390. return -1;
  391. }
  392. // First, check whether specific socket type overloads the option.
  393. int rc = xsetsockopt (option_, optval_, optvallen_);
  394. if (rc == 0 || errno != EINVAL) {
  395. return rc;
  396. }
  397. // If the socket type doesn't support the option, pass it to
  398. // the generic option parser.
  399. rc = options.setsockopt (option_, optval_, optvallen_);
  400. update_pipe_options (option_);
  401. return rc;
  402. }
  403. int zmq::socket_base_t::getsockopt (int option_,
  404. void *optval_,
  405. size_t *optvallen_)
  406. {
  407. scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
  408. if (unlikely (_ctx_terminated)) {
  409. errno = ETERM;
  410. return -1;
  411. }
  412. if (option_ == ZMQ_RCVMORE) {
  413. return do_getsockopt<int> (optval_, optvallen_, _rcvmore ? 1 : 0);
  414. }
  415. if (option_ == ZMQ_FD) {
  416. if (_thread_safe) {
  417. // thread safe socket doesn't provide file descriptor
  418. errno = EINVAL;
  419. return -1;
  420. }
  421. return do_getsockopt<fd_t> (
  422. optval_, optvallen_,
  423. (static_cast<mailbox_t *> (_mailbox))->get_fd ());
  424. }
  425. if (option_ == ZMQ_EVENTS) {
  426. const int rc = process_commands (0, false);
  427. if (rc != 0 && (errno == EINTR || errno == ETERM)) {
  428. return -1;
  429. }
  430. errno_assert (rc == 0);
  431. return do_getsockopt<int> (optval_, optvallen_,
  432. (has_out () ? ZMQ_POLLOUT : 0)
  433. | (has_in () ? ZMQ_POLLIN : 0));
  434. }
  435. if (option_ == ZMQ_LAST_ENDPOINT) {
  436. return do_getsockopt (optval_, optvallen_, _last_endpoint);
  437. }
  438. if (option_ == ZMQ_THREAD_SAFE) {
  439. return do_getsockopt<int> (optval_, optvallen_, _thread_safe ? 1 : 0);
  440. }
  441. return options.getsockopt (option_, optval_, optvallen_);
  442. }
  443. int zmq::socket_base_t::join (const char *group_)
  444. {
  445. scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
  446. return xjoin (group_);
  447. }
  448. int zmq::socket_base_t::leave (const char *group_)
  449. {
  450. scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
  451. return xleave (group_);
  452. }
  453. void zmq::socket_base_t::add_signaler (signaler_t *s_)
  454. {
  455. zmq_assert (_thread_safe);
  456. scoped_lock_t sync_lock (_sync);
  457. (static_cast<mailbox_safe_t *> (_mailbox))->add_signaler (s_);
  458. }
  459. void zmq::socket_base_t::remove_signaler (signaler_t *s_)
  460. {
  461. zmq_assert (_thread_safe);
  462. scoped_lock_t sync_lock (_sync);
  463. (static_cast<mailbox_safe_t *> (_mailbox))->remove_signaler (s_);
  464. }
  465. int zmq::socket_base_t::bind (const char *endpoint_uri_)
  466. {
  467. scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
  468. if (unlikely (_ctx_terminated)) {
  469. errno = ETERM;
  470. return -1;
  471. }
  472. // Process pending commands, if any.
  473. int rc = process_commands (0, false);
  474. if (unlikely (rc != 0)) {
  475. return -1;
  476. }
  477. // Parse endpoint_uri_ string.
  478. std::string protocol;
  479. std::string address;
  480. if (parse_uri (endpoint_uri_, protocol, address)
  481. || check_protocol (protocol)) {
  482. return -1;
  483. }
  484. if (protocol == protocol_name::inproc) {
  485. const endpoint_t endpoint = {this, options};
  486. rc = register_endpoint (endpoint_uri_, endpoint);
  487. if (rc == 0) {
  488. connect_pending (endpoint_uri_, this);
  489. _last_endpoint.assign (endpoint_uri_);
  490. options.connected = true;
  491. }
  492. return rc;
  493. }
  494. #if defined ZMQ_HAVE_OPENPGM || defined ZMQ_HAVE_NORM
  495. #if defined ZMQ_HAVE_OPENPGM && defined ZMQ_HAVE_NORM
  496. if (protocol == protocol_name::pgm || protocol == protocol_name::epgm
  497. || protocol == protocol_name::norm) {
  498. #elif defined ZMQ_HAVE_OPENPGM
  499. if (protocol == protocol_name::pgm || protocol == protocol_name::epgm) {
  500. #else // defined ZMQ_HAVE_NORM
  501. if (protocol == protocol_name::norm) {
  502. #endif
  503. // For convenience's sake, bind can be used interchangeable with
  504. // connect for PGM, EPGM, NORM transports.
  505. rc = connect (endpoint_uri_);
  506. if (rc != -1)
  507. options.connected = true;
  508. return rc;
  509. }
  510. #endif
  511. if (protocol == protocol_name::udp) {
  512. if (!(options.type == ZMQ_DGRAM || options.type == ZMQ_DISH)) {
  513. errno = ENOCOMPATPROTO;
  514. return -1;
  515. }
  516. // Choose the I/O thread to run the session in.
  517. io_thread_t *io_thread = choose_io_thread (options.affinity);
  518. if (!io_thread) {
  519. errno = EMTHREAD;
  520. return -1;
  521. }
  522. address_t *paddr =
  523. new (std::nothrow) address_t (protocol, address, this->get_ctx ());
  524. alloc_assert (paddr);
  525. paddr->resolved.udp_addr = new (std::nothrow) udp_address_t ();
  526. alloc_assert (paddr->resolved.udp_addr);
  527. rc = paddr->resolved.udp_addr->resolve (address.c_str (), true,
  528. options.ipv6);
  529. if (rc != 0) {
  530. LIBZMQ_DELETE (paddr);
  531. return -1;
  532. }
  533. session_base_t *session =
  534. session_base_t::create (io_thread, true, this, options, paddr);
  535. errno_assert (session);
  536. // Create a bi-directional pipe.
  537. object_t *parents[2] = {this, session};
  538. pipe_t *new_pipes[2] = {NULL, NULL};
  539. int hwms[2] = {options.sndhwm, options.rcvhwm};
  540. bool conflates[2] = {false, false};
  541. rc = pipepair (parents, new_pipes, hwms, conflates);
  542. errno_assert (rc == 0);
  543. // Attach local end of the pipe to the socket object.
  544. attach_pipe (new_pipes[0], true, true);
  545. pipe_t *const newpipe = new_pipes[0];
  546. // Attach remote end of the pipe to the session object later on.
  547. session->attach_pipe (new_pipes[1]);
  548. // Save last endpoint URI
  549. paddr->to_string (_last_endpoint);
  550. // TODO shouldn't this use _last_endpoint instead of endpoint_uri_? as in the other cases
  551. add_endpoint (endpoint_uri_pair_t (endpoint_uri_, std::string (),
  552. endpoint_type_none),
  553. static_cast<own_t *> (session), newpipe);
  554. return 0;
  555. }
  556. // Remaining transports require to be run in an I/O thread, so at this
  557. // point we'll choose one.
  558. io_thread_t *io_thread = choose_io_thread (options.affinity);
  559. if (!io_thread) {
  560. errno = EMTHREAD;
  561. return -1;
  562. }
  563. if (protocol == protocol_name::tcp) {
  564. tcp_listener_t *listener =
  565. new (std::nothrow) tcp_listener_t (io_thread, this, options);
  566. alloc_assert (listener);
  567. rc = listener->set_local_address (address.c_str ());
  568. if (rc != 0) {
  569. LIBZMQ_DELETE (listener);
  570. event_bind_failed (make_unconnected_bind_endpoint_pair (address),
  571. zmq_errno ());
  572. return -1;
  573. }
  574. // Save last endpoint URI
  575. listener->get_local_address (_last_endpoint);
  576. add_endpoint (make_unconnected_bind_endpoint_pair (_last_endpoint),
  577. static_cast<own_t *> (listener), NULL);
  578. options.connected = true;
  579. return 0;
  580. }
  581. #ifdef ZMQ_HAVE_WS
  582. #ifdef ZMQ_HAVE_WSS
  583. if (protocol == protocol_name::ws || protocol == protocol_name::wss) {
  584. ws_listener_t *listener = new (std::nothrow) ws_listener_t (
  585. io_thread, this, options, protocol == protocol_name::wss);
  586. #else
  587. if (protocol == protocol_name::ws) {
  588. ws_listener_t *listener =
  589. new (std::nothrow) ws_listener_t (io_thread, this, options, false);
  590. #endif
  591. alloc_assert (listener);
  592. rc = listener->set_local_address (address.c_str ());
  593. if (rc != 0) {
  594. LIBZMQ_DELETE (listener);
  595. event_bind_failed (make_unconnected_bind_endpoint_pair (address),
  596. zmq_errno ());
  597. return -1;
  598. }
  599. // Save last endpoint URI
  600. listener->get_local_address (_last_endpoint);
  601. add_endpoint (make_unconnected_bind_endpoint_pair (_last_endpoint),
  602. static_cast<own_t *> (listener), NULL);
  603. options.connected = true;
  604. return 0;
  605. }
  606. #endif
  607. #if defined ZMQ_HAVE_IPC
  608. if (protocol == protocol_name::ipc) {
  609. ipc_listener_t *listener =
  610. new (std::nothrow) ipc_listener_t (io_thread, this, options);
  611. alloc_assert (listener);
  612. int rc = listener->set_local_address (address.c_str ());
  613. if (rc != 0) {
  614. LIBZMQ_DELETE (listener);
  615. event_bind_failed (make_unconnected_bind_endpoint_pair (address),
  616. zmq_errno ());
  617. return -1;
  618. }
  619. // Save last endpoint URI
  620. listener->get_local_address (_last_endpoint);
  621. add_endpoint (make_unconnected_bind_endpoint_pair (_last_endpoint),
  622. static_cast<own_t *> (listener), NULL);
  623. options.connected = true;
  624. return 0;
  625. }
  626. #endif
  627. #if defined ZMQ_HAVE_TIPC
  628. if (protocol == protocol_name::tipc) {
  629. tipc_listener_t *listener =
  630. new (std::nothrow) tipc_listener_t (io_thread, this, options);
  631. alloc_assert (listener);
  632. int rc = listener->set_local_address (address.c_str ());
  633. if (rc != 0) {
  634. LIBZMQ_DELETE (listener);
  635. event_bind_failed (make_unconnected_bind_endpoint_pair (address),
  636. zmq_errno ());
  637. return -1;
  638. }
  639. // Save last endpoint URI
  640. listener->get_local_address (_last_endpoint);
  641. // TODO shouldn't this use _last_endpoint as in the other cases?
  642. add_endpoint (make_unconnected_bind_endpoint_pair (endpoint_uri_),
  643. static_cast<own_t *> (listener), NULL);
  644. options.connected = true;
  645. return 0;
  646. }
  647. #endif
  648. #if defined ZMQ_HAVE_VMCI
  649. if (protocol == protocol_name::vmci) {
  650. vmci_listener_t *listener =
  651. new (std::nothrow) vmci_listener_t (io_thread, this, options);
  652. alloc_assert (listener);
  653. int rc = listener->set_local_address (address.c_str ());
  654. if (rc != 0) {
  655. LIBZMQ_DELETE (listener);
  656. event_bind_failed (make_unconnected_bind_endpoint_pair (address),
  657. zmq_errno ());
  658. return -1;
  659. }
  660. listener->get_local_address (_last_endpoint);
  661. add_endpoint (make_unconnected_bind_endpoint_pair (_last_endpoint),
  662. static_cast<own_t *> (listener), NULL);
  663. options.connected = true;
  664. return 0;
  665. }
  666. #endif
  667. zmq_assert (false);
  668. return -1;
  669. }
  670. int zmq::socket_base_t::connect (const char *endpoint_uri_)
  671. {
  672. scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
  673. return connect_internal (endpoint_uri_);
  674. }
  675. int zmq::socket_base_t::connect_internal (const char *endpoint_uri_)
  676. {
  677. if (unlikely (_ctx_terminated)) {
  678. errno = ETERM;
  679. return -1;
  680. }
  681. // Process pending commands, if any.
  682. int rc = process_commands (0, false);
  683. if (unlikely (rc != 0)) {
  684. return -1;
  685. }
  686. // Parse endpoint_uri_ string.
  687. std::string protocol;
  688. std::string address;
  689. if (parse_uri (endpoint_uri_, protocol, address)
  690. || check_protocol (protocol)) {
  691. return -1;
  692. }
  693. if (protocol == protocol_name::inproc) {
  694. // TODO: inproc connect is specific with respect to creating pipes
  695. // as there's no 'reconnect' functionality implemented. Once that
  696. // is in place we should follow generic pipe creation algorithm.
  697. // Find the peer endpoint.
  698. const endpoint_t peer = find_endpoint (endpoint_uri_);
  699. // The total HWM for an inproc connection should be the sum of
  700. // the binder's HWM and the connector's HWM.
  701. const int sndhwm = peer.socket == NULL
  702. ? options.sndhwm
  703. : options.sndhwm != 0 && peer.options.rcvhwm != 0
  704. ? options.sndhwm + peer.options.rcvhwm
  705. : 0;
  706. const int rcvhwm = peer.socket == NULL
  707. ? options.rcvhwm
  708. : options.rcvhwm != 0 && peer.options.sndhwm != 0
  709. ? options.rcvhwm + peer.options.sndhwm
  710. : 0;
  711. // Create a bi-directional pipe to connect the peers.
  712. object_t *parents[2] = {this, peer.socket == NULL ? this : peer.socket};
  713. pipe_t *new_pipes[2] = {NULL, NULL};
  714. const bool conflate = get_effective_conflate_option (options);
  715. int hwms[2] = {conflate ? -1 : sndhwm, conflate ? -1 : rcvhwm};
  716. bool conflates[2] = {conflate, conflate};
  717. rc = pipepair (parents, new_pipes, hwms, conflates);
  718. if (!conflate) {
  719. new_pipes[0]->set_hwms_boost (peer.options.sndhwm,
  720. peer.options.rcvhwm);
  721. new_pipes[1]->set_hwms_boost (options.sndhwm, options.rcvhwm);
  722. }
  723. errno_assert (rc == 0);
  724. if (!peer.socket) {
  725. // The peer doesn't exist yet so we don't know whether
  726. // to send the routing id message or not. To resolve this,
  727. // we always send our routing id and drop it later if
  728. // the peer doesn't expect it.
  729. send_routing_id (new_pipes[0], options);
  730. #ifdef ZMQ_BUILD_DRAFT_API
  731. // If set, send the hello msg of the local socket to the peer.
  732. if (options.can_send_hello_msg && options.hello_msg.size () > 0) {
  733. send_hello_msg (new_pipes[0], options);
  734. }
  735. #endif
  736. const endpoint_t endpoint = {this, options};
  737. pend_connection (std::string (endpoint_uri_), endpoint, new_pipes);
  738. } else {
  739. // If required, send the routing id of the local socket to the peer.
  740. if (peer.options.recv_routing_id) {
  741. send_routing_id (new_pipes[0], options);
  742. }
  743. // If required, send the routing id of the peer to the local socket.
  744. if (options.recv_routing_id) {
  745. send_routing_id (new_pipes[1], peer.options);
  746. }
  747. #ifdef ZMQ_BUILD_DRAFT_API
  748. // If set, send the hello msg of the local socket to the peer.
  749. if (options.can_send_hello_msg && options.hello_msg.size () > 0) {
  750. send_hello_msg (new_pipes[0], options);
  751. }
  752. // If set, send the hello msg of the peer to the local socket.
  753. if (peer.options.can_send_hello_msg
  754. && peer.options.hello_msg.size () > 0) {
  755. send_hello_msg (new_pipes[1], peer.options);
  756. }
  757. if (peer.options.can_recv_disconnect_msg
  758. && peer.options.disconnect_msg.size () > 0)
  759. new_pipes[0]->set_disconnect_msg (peer.options.disconnect_msg);
  760. #endif
  761. // Attach remote end of the pipe to the peer socket. Note that peer's
  762. // seqnum was incremented in find_endpoint function. We don't need it
  763. // increased here.
  764. send_bind (peer.socket, new_pipes[1], false);
  765. }
  766. // Attach local end of the pipe to this socket object.
  767. attach_pipe (new_pipes[0], false, true);
  768. // Save last endpoint URI
  769. _last_endpoint.assign (endpoint_uri_);
  770. // remember inproc connections for disconnect
  771. _inprocs.emplace (endpoint_uri_, new_pipes[0]);
  772. options.connected = true;
  773. return 0;
  774. }
  775. const bool is_single_connect =
  776. (options.type == ZMQ_DEALER || options.type == ZMQ_SUB
  777. || options.type == ZMQ_PUB || options.type == ZMQ_REQ);
  778. if (unlikely (is_single_connect)) {
  779. if (0 != _endpoints.count (endpoint_uri_)) {
  780. // There is no valid use for multiple connects for SUB-PUB nor
  781. // DEALER-ROUTER nor REQ-REP. Multiple connects produces
  782. // nonsensical results.
  783. return 0;
  784. }
  785. }
  786. // Choose the I/O thread to run the session in.
  787. io_thread_t *io_thread = choose_io_thread (options.affinity);
  788. if (!io_thread) {
  789. errno = EMTHREAD;
  790. return -1;
  791. }
  792. address_t *paddr =
  793. new (std::nothrow) address_t (protocol, address, this->get_ctx ());
  794. alloc_assert (paddr);
  795. // Resolve address (if needed by the protocol)
  796. if (protocol == protocol_name::tcp) {
  797. // Do some basic sanity checks on tcp:// address syntax
  798. // - hostname starts with digit or letter, with embedded '-' or '.'
  799. // - IPv6 address may contain hex chars and colons.
  800. // - IPv6 link local address may contain % followed by interface name / zone_id
  801. // (Reference: https://tools.ietf.org/html/rfc4007)
  802. // - IPv4 address may contain decimal digits and dots.
  803. // - Address must end in ":port" where port is *, or numeric
  804. // - Address may contain two parts separated by ':'
  805. // Following code is quick and dirty check to catch obvious errors,
  806. // without trying to be fully accurate.
  807. const char *check = address.c_str ();
  808. if (isalnum (*check) || isxdigit (*check) || *check == '['
  809. || *check == ':') {
  810. check++;
  811. while (isalnum (*check) || isxdigit (*check) || *check == '.'
  812. || *check == '-' || *check == ':' || *check == '%'
  813. || *check == ';' || *check == '[' || *check == ']'
  814. || *check == '_' || *check == '*') {
  815. check++;
  816. }
  817. }
  818. // Assume the worst, now look for success
  819. rc = -1;
  820. // Did we reach the end of the address safely?
  821. if (*check == 0) {
  822. // Do we have a valid port string? (cannot be '*' in connect
  823. check = strrchr (address.c_str (), ':');
  824. if (check) {
  825. check++;
  826. if (*check && (isdigit (*check)))
  827. rc = 0; // Valid
  828. }
  829. }
  830. if (rc == -1) {
  831. errno = EINVAL;
  832. LIBZMQ_DELETE (paddr);
  833. return -1;
  834. }
  835. // Defer resolution until a socket is opened
  836. paddr->resolved.tcp_addr = NULL;
  837. }
  838. #ifdef ZMQ_HAVE_WS
  839. #ifdef ZMQ_HAVE_WSS
  840. else if (protocol == protocol_name::ws || protocol == protocol_name::wss) {
  841. if (protocol == protocol_name::wss) {
  842. paddr->resolved.wss_addr = new (std::nothrow) wss_address_t ();
  843. alloc_assert (paddr->resolved.wss_addr);
  844. rc = paddr->resolved.wss_addr->resolve (address.c_str (), false,
  845. options.ipv6);
  846. } else
  847. #else
  848. else if (protocol == protocol_name::ws) {
  849. #endif
  850. {
  851. paddr->resolved.ws_addr = new (std::nothrow) ws_address_t ();
  852. alloc_assert (paddr->resolved.ws_addr);
  853. rc = paddr->resolved.ws_addr->resolve (address.c_str (), false,
  854. options.ipv6);
  855. }
  856. if (rc != 0) {
  857. LIBZMQ_DELETE (paddr);
  858. return -1;
  859. }
  860. }
  861. #endif
  862. #if defined ZMQ_HAVE_IPC
  863. else if (protocol == protocol_name::ipc) {
  864. paddr->resolved.ipc_addr = new (std::nothrow) ipc_address_t ();
  865. alloc_assert (paddr->resolved.ipc_addr);
  866. int rc = paddr->resolved.ipc_addr->resolve (address.c_str ());
  867. if (rc != 0) {
  868. LIBZMQ_DELETE (paddr);
  869. return -1;
  870. }
  871. }
  872. #endif
  873. if (protocol == protocol_name::udp) {
  874. if (options.type != ZMQ_RADIO) {
  875. errno = ENOCOMPATPROTO;
  876. LIBZMQ_DELETE (paddr);
  877. return -1;
  878. }
  879. paddr->resolved.udp_addr = new (std::nothrow) udp_address_t ();
  880. alloc_assert (paddr->resolved.udp_addr);
  881. rc = paddr->resolved.udp_addr->resolve (address.c_str (), false,
  882. options.ipv6);
  883. if (rc != 0) {
  884. LIBZMQ_DELETE (paddr);
  885. return -1;
  886. }
  887. }
  888. // TBD - Should we check address for ZMQ_HAVE_NORM???
  889. #ifdef ZMQ_HAVE_OPENPGM
  890. if (protocol == protocol_name::pgm || protocol == protocol_name::epgm) {
  891. struct pgm_addrinfo_t *res = NULL;
  892. uint16_t port_number = 0;
  893. int rc =
  894. pgm_socket_t::init_address (address.c_str (), &res, &port_number);
  895. if (res != NULL)
  896. pgm_freeaddrinfo (res);
  897. if (rc != 0 || port_number == 0) {
  898. return -1;
  899. }
  900. }
  901. #endif
  902. #if defined ZMQ_HAVE_TIPC
  903. else if (protocol == protocol_name::tipc) {
  904. paddr->resolved.tipc_addr = new (std::nothrow) tipc_address_t ();
  905. alloc_assert (paddr->resolved.tipc_addr);
  906. int rc = paddr->resolved.tipc_addr->resolve (address.c_str ());
  907. if (rc != 0) {
  908. LIBZMQ_DELETE (paddr);
  909. return -1;
  910. }
  911. const sockaddr_tipc *const saddr =
  912. reinterpret_cast<const sockaddr_tipc *> (
  913. paddr->resolved.tipc_addr->addr ());
  914. // Cannot connect to random Port Identity
  915. if (saddr->addrtype == TIPC_ADDR_ID
  916. && paddr->resolved.tipc_addr->is_random ()) {
  917. LIBZMQ_DELETE (paddr);
  918. errno = EINVAL;
  919. return -1;
  920. }
  921. }
  922. #endif
  923. #if defined ZMQ_HAVE_VMCI
  924. else if (protocol == protocol_name::vmci) {
  925. paddr->resolved.vmci_addr =
  926. new (std::nothrow) vmci_address_t (this->get_ctx ());
  927. alloc_assert (paddr->resolved.vmci_addr);
  928. int rc = paddr->resolved.vmci_addr->resolve (address.c_str ());
  929. if (rc != 0) {
  930. LIBZMQ_DELETE (paddr);
  931. return -1;
  932. }
  933. }
  934. #endif
  935. // Create session.
  936. session_base_t *session =
  937. session_base_t::create (io_thread, true, this, options, paddr);
  938. errno_assert (session);
  939. // PGM does not support subscription forwarding; ask for all data to be
  940. // sent to this pipe. (same for NORM, currently?)
  941. #if defined ZMQ_HAVE_OPENPGM && defined ZMQ_HAVE_NORM
  942. const bool subscribe_to_all =
  943. protocol == protocol_name::pgm || protocol == protocol_name::epgm
  944. || protocol == protocol_name::norm || protocol == protocol_name::udp;
  945. #elif defined ZMQ_HAVE_OPENPGM
  946. const bool subscribe_to_all = protocol == protocol_name::pgm
  947. || protocol == protocol_name::epgm
  948. || protocol == protocol_name::udp;
  949. #elif defined ZMQ_HAVE_NORM
  950. const bool subscribe_to_all =
  951. protocol == protocol_name::norm || protocol == protocol_name::udp;
  952. #else
  953. const bool subscribe_to_all = protocol == protocol_name::udp;
  954. #endif
  955. pipe_t *newpipe = NULL;
  956. if (options.immediate != 1 || subscribe_to_all) {
  957. // Create a bi-directional pipe.
  958. object_t *parents[2] = {this, session};
  959. pipe_t *new_pipes[2] = {NULL, NULL};
  960. const bool conflate = get_effective_conflate_option (options);
  961. int hwms[2] = {conflate ? -1 : options.sndhwm,
  962. conflate ? -1 : options.rcvhwm};
  963. bool conflates[2] = {conflate, conflate};
  964. rc = pipepair (parents, new_pipes, hwms, conflates);
  965. errno_assert (rc == 0);
  966. // Attach local end of the pipe to the socket object.
  967. attach_pipe (new_pipes[0], subscribe_to_all, true);
  968. newpipe = new_pipes[0];
  969. // Attach remote end of the pipe to the session object later on.
  970. session->attach_pipe (new_pipes[1]);
  971. }
  972. // Save last endpoint URI
  973. paddr->to_string (_last_endpoint);
  974. add_endpoint (make_unconnected_connect_endpoint_pair (endpoint_uri_),
  975. static_cast<own_t *> (session), newpipe);
  976. return 0;
  977. }
  978. std::string
  979. zmq::socket_base_t::resolve_tcp_addr (std::string endpoint_uri_pair_,
  980. const char *tcp_address_)
  981. {
  982. // The resolved last_endpoint is used as a key in the endpoints map.
  983. // The address passed by the user might not match in the TCP case due to
  984. // IPv4-in-IPv6 mapping (EG: tcp://[::ffff:127.0.0.1]:9999), so try to
  985. // resolve before giving up. Given at this stage we don't know whether a
  986. // socket is connected or bound, try with both.
  987. if (_endpoints.find (endpoint_uri_pair_) == _endpoints.end ()) {
  988. tcp_address_t *tcp_addr = new (std::nothrow) tcp_address_t ();
  989. alloc_assert (tcp_addr);
  990. int rc = tcp_addr->resolve (tcp_address_, false, options.ipv6);
  991. if (rc == 0) {
  992. tcp_addr->to_string (endpoint_uri_pair_);
  993. if (_endpoints.find (endpoint_uri_pair_) == _endpoints.end ()) {
  994. rc = tcp_addr->resolve (tcp_address_, true, options.ipv6);
  995. if (rc == 0) {
  996. tcp_addr->to_string (endpoint_uri_pair_);
  997. }
  998. }
  999. }
  1000. LIBZMQ_DELETE (tcp_addr);
  1001. }
  1002. return endpoint_uri_pair_;
  1003. }
  1004. void zmq::socket_base_t::add_endpoint (
  1005. const endpoint_uri_pair_t &endpoint_pair_, own_t *endpoint_, pipe_t *pipe_)
  1006. {
  1007. // Activate the session. Make it a child of this socket.
  1008. launch_child (endpoint_);
  1009. _endpoints.ZMQ_MAP_INSERT_OR_EMPLACE (endpoint_pair_.identifier (),
  1010. endpoint_pipe_t (endpoint_, pipe_));
  1011. if (pipe_ != NULL)
  1012. pipe_->set_endpoint_pair (endpoint_pair_);
  1013. }
  1014. int zmq::socket_base_t::term_endpoint (const char *endpoint_uri_)
  1015. {
  1016. scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
  1017. // Check whether the context hasn't been shut down yet.
  1018. if (unlikely (_ctx_terminated)) {
  1019. errno = ETERM;
  1020. return -1;
  1021. }
  1022. // Check whether endpoint address passed to the function is valid.
  1023. if (unlikely (!endpoint_uri_)) {
  1024. errno = EINVAL;
  1025. return -1;
  1026. }
  1027. // Process pending commands, if any, since there could be pending unprocessed process_own()'s
  1028. // (from launch_child() for example) we're asked to terminate now.
  1029. const int rc = process_commands (0, false);
  1030. if (unlikely (rc != 0)) {
  1031. return -1;
  1032. }
  1033. // Parse endpoint_uri_ string.
  1034. std::string uri_protocol;
  1035. std::string uri_path;
  1036. if (parse_uri (endpoint_uri_, uri_protocol, uri_path)
  1037. || check_protocol (uri_protocol)) {
  1038. return -1;
  1039. }
  1040. const std::string endpoint_uri_str = std::string (endpoint_uri_);
  1041. // Disconnect an inproc socket
  1042. if (uri_protocol == protocol_name::inproc) {
  1043. return unregister_endpoint (endpoint_uri_str, this) == 0
  1044. ? 0
  1045. : _inprocs.erase_pipes (endpoint_uri_str);
  1046. }
  1047. const std::string resolved_endpoint_uri =
  1048. uri_protocol == protocol_name::tcp
  1049. ? resolve_tcp_addr (endpoint_uri_str, uri_path.c_str ())
  1050. : endpoint_uri_str;
  1051. // Find the endpoints range (if any) corresponding to the endpoint_uri_pair_ string.
  1052. const std::pair<endpoints_t::iterator, endpoints_t::iterator> range =
  1053. _endpoints.equal_range (resolved_endpoint_uri);
  1054. if (range.first == range.second) {
  1055. errno = ENOENT;
  1056. return -1;
  1057. }
  1058. for (endpoints_t::iterator it = range.first; it != range.second; ++it) {
  1059. // If we have an associated pipe, terminate it.
  1060. if (it->second.second != NULL)
  1061. it->second.second->terminate (false);
  1062. term_child (it->second.first);
  1063. }
  1064. _endpoints.erase (range.first, range.second);
  1065. return 0;
  1066. }
  1067. int zmq::socket_base_t::send (msg_t *msg_, int flags_)
  1068. {
  1069. scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
  1070. // Check whether the context hasn't been shut down yet.
  1071. if (unlikely (_ctx_terminated)) {
  1072. errno = ETERM;
  1073. return -1;
  1074. }
  1075. // Check whether message passed to the function is valid.
  1076. if (unlikely (!msg_ || !msg_->check ())) {
  1077. errno = EFAULT;
  1078. return -1;
  1079. }
  1080. // Process pending commands, if any.
  1081. int rc = process_commands (0, true);
  1082. if (unlikely (rc != 0)) {
  1083. return -1;
  1084. }
  1085. // Clear any user-visible flags that are set on the message.
  1086. msg_->reset_flags (msg_t::more);
  1087. // At this point we impose the flags on the message.
  1088. if (flags_ & ZMQ_SNDMORE)
  1089. msg_->set_flags (msg_t::more);
  1090. msg_->reset_metadata ();
  1091. // Try to send the message using method in each socket class
  1092. rc = xsend (msg_);
  1093. if (rc == 0) {
  1094. return 0;
  1095. }
  1096. // Special case for ZMQ_PUSH: -2 means pipe is dead while a
  1097. // multi-part send is in progress and can't be recovered, so drop
  1098. // silently when in blocking mode to keep backward compatibility.
  1099. if (unlikely (rc == -2)) {
  1100. if (!((flags_ & ZMQ_DONTWAIT) || options.sndtimeo == 0)) {
  1101. rc = msg_->close ();
  1102. errno_assert (rc == 0);
  1103. rc = msg_->init ();
  1104. errno_assert (rc == 0);
  1105. return 0;
  1106. }
  1107. }
  1108. if (unlikely (errno != EAGAIN)) {
  1109. return -1;
  1110. }
  1111. // In case of non-blocking send we'll simply propagate
  1112. // the error - including EAGAIN - up the stack.
  1113. if ((flags_ & ZMQ_DONTWAIT) || options.sndtimeo == 0) {
  1114. return -1;
  1115. }
  1116. // Compute the time when the timeout should occur.
  1117. // If the timeout is infinite, don't care.
  1118. int timeout = options.sndtimeo;
  1119. const uint64_t end = timeout < 0 ? 0 : (_clock.now_ms () + timeout);
  1120. // Oops, we couldn't send the message. Wait for the next
  1121. // command, process it and try to send the message again.
  1122. // If timeout is reached in the meantime, return EAGAIN.
  1123. while (true) {
  1124. if (unlikely (process_commands (timeout, false) != 0)) {
  1125. return -1;
  1126. }
  1127. rc = xsend (msg_);
  1128. if (rc == 0)
  1129. break;
  1130. if (unlikely (errno != EAGAIN)) {
  1131. return -1;
  1132. }
  1133. if (timeout > 0) {
  1134. timeout = static_cast<int> (end - _clock.now_ms ());
  1135. if (timeout <= 0) {
  1136. errno = EAGAIN;
  1137. return -1;
  1138. }
  1139. }
  1140. }
  1141. return 0;
  1142. }
  1143. int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
  1144. {
  1145. scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
  1146. // Check whether the context hasn't been shut down yet.
  1147. if (unlikely (_ctx_terminated)) {
  1148. errno = ETERM;
  1149. return -1;
  1150. }
  1151. // Check whether message passed to the function is valid.
  1152. if (unlikely (!msg_ || !msg_->check ())) {
  1153. errno = EFAULT;
  1154. return -1;
  1155. }
  1156. // Once every inbound_poll_rate messages check for signals and process
  1157. // incoming commands. This happens only if we are not polling altogether
  1158. // because there are messages available all the time. If poll occurs,
  1159. // ticks is set to zero and thus we avoid this code.
  1160. //
  1161. // Note that 'recv' uses different command throttling algorithm (the one
  1162. // described above) from the one used by 'send'. This is because counting
  1163. // ticks is more efficient than doing RDTSC all the time.
  1164. if (++_ticks == inbound_poll_rate) {
  1165. if (unlikely (process_commands (0, false) != 0)) {
  1166. return -1;
  1167. }
  1168. _ticks = 0;
  1169. }
  1170. // Get the message.
  1171. int rc = xrecv (msg_);
  1172. if (unlikely (rc != 0 && errno != EAGAIN)) {
  1173. return -1;
  1174. }
  1175. // If we have the message, return immediately.
  1176. if (rc == 0) {
  1177. extract_flags (msg_);
  1178. return 0;
  1179. }
  1180. // If the message cannot be fetched immediately, there are two scenarios.
  1181. // For non-blocking recv, commands are processed in case there's an
  1182. // activate_reader command already waiting in a command pipe.
  1183. // If it's not, return EAGAIN.
  1184. if ((flags_ & ZMQ_DONTWAIT) || options.rcvtimeo == 0) {
  1185. if (unlikely (process_commands (0, false) != 0)) {
  1186. return -1;
  1187. }
  1188. _ticks = 0;
  1189. rc = xrecv (msg_);
  1190. if (rc < 0) {
  1191. return rc;
  1192. }
  1193. extract_flags (msg_);
  1194. return 0;
  1195. }
  1196. // Compute the time when the timeout should occur.
  1197. // If the timeout is infinite, don't care.
  1198. int timeout = options.rcvtimeo;
  1199. const uint64_t end = timeout < 0 ? 0 : (_clock.now_ms () + timeout);
  1200. // In blocking scenario, commands are processed over and over again until
  1201. // we are able to fetch a message.
  1202. bool block = (_ticks != 0);
  1203. while (true) {
  1204. if (unlikely (process_commands (block ? timeout : 0, false) != 0)) {
  1205. return -1;
  1206. }
  1207. rc = xrecv (msg_);
  1208. if (rc == 0) {
  1209. _ticks = 0;
  1210. break;
  1211. }
  1212. if (unlikely (errno != EAGAIN)) {
  1213. return -1;
  1214. }
  1215. block = true;
  1216. if (timeout > 0) {
  1217. timeout = static_cast<int> (end - _clock.now_ms ());
  1218. if (timeout <= 0) {
  1219. errno = EAGAIN;
  1220. return -1;
  1221. }
  1222. }
  1223. }
  1224. extract_flags (msg_);
  1225. return 0;
  1226. }
  1227. int zmq::socket_base_t::close ()
  1228. {
  1229. scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
  1230. // Remove all existing signalers for thread safe sockets
  1231. if (_thread_safe)
  1232. (static_cast<mailbox_safe_t *> (_mailbox))->clear_signalers ();
  1233. // Mark the socket as dead
  1234. _tag = 0xdeadbeef;
  1235. // Transfer the ownership of the socket from this application thread
  1236. // to the reaper thread which will take care of the rest of shutdown
  1237. // process.
  1238. send_reap (this);
  1239. return 0;
  1240. }
  1241. bool zmq::socket_base_t::has_in ()
  1242. {
  1243. return xhas_in ();
  1244. }
  1245. bool zmq::socket_base_t::has_out ()
  1246. {
  1247. return xhas_out ();
  1248. }
  1249. void zmq::socket_base_t::start_reaping (poller_t *poller_)
  1250. {
  1251. // Plug the socket to the reaper thread.
  1252. _poller = poller_;
  1253. fd_t fd;
  1254. if (!_thread_safe)
  1255. fd = (static_cast<mailbox_t *> (_mailbox))->get_fd ();
  1256. else {
  1257. scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
  1258. _reaper_signaler = new (std::nothrow) signaler_t ();
  1259. zmq_assert (_reaper_signaler);
  1260. // Add signaler to the safe mailbox
  1261. fd = _reaper_signaler->get_fd ();
  1262. (static_cast<mailbox_safe_t *> (_mailbox))
  1263. ->add_signaler (_reaper_signaler);
  1264. // Send a signal to make sure reaper handle existing commands
  1265. _reaper_signaler->send ();
  1266. }
  1267. _handle = _poller->add_fd (fd, this);
  1268. _poller->set_pollin (_handle);
  1269. // Initialise the termination and check whether it can be deallocated
  1270. // immediately.
  1271. terminate ();
  1272. check_destroy ();
  1273. }
  1274. int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
  1275. {
  1276. if (timeout_ == 0) {
  1277. // If we are asked not to wait, check whether we haven't processed
  1278. // commands recently, so that we can throttle the new commands.
  1279. // Get the CPU's tick counter. If 0, the counter is not available.
  1280. const uint64_t tsc = zmq::clock_t::rdtsc ();
  1281. // Optimised version of command processing - it doesn't have to check
  1282. // for incoming commands each time. It does so only if certain time
  1283. // elapsed since last command processing. Command delay varies
  1284. // depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
  1285. // etc. The optimisation makes sense only on platforms where getting
  1286. // a timestamp is a very cheap operation (tens of nanoseconds).
  1287. if (tsc && throttle_) {
  1288. // Check whether TSC haven't jumped backwards (in case of migration
  1289. // between CPU cores) and whether certain time have elapsed since
  1290. // last command processing. If it didn't do nothing.
  1291. if (tsc >= _last_tsc && tsc - _last_tsc <= max_command_delay)
  1292. return 0;
  1293. _last_tsc = tsc;
  1294. }
  1295. }
  1296. // Check whether there are any commands pending for this thread.
  1297. command_t cmd;
  1298. int rc = _mailbox->recv (&cmd, timeout_);
  1299. // Process all available commands.
  1300. while (rc == 0) {
  1301. cmd.destination->process_command (cmd);
  1302. rc = _mailbox->recv (&cmd, 0);
  1303. }
  1304. if (errno == EINTR)
  1305. return -1;
  1306. zmq_assert (errno == EAGAIN);
  1307. if (_ctx_terminated) {
  1308. errno = ETERM;
  1309. return -1;
  1310. }
  1311. return 0;
  1312. }
  1313. void zmq::socket_base_t::process_stop ()
  1314. {
  1315. // Here, someone have called zmq_ctx_term while the socket was still alive.
  1316. // We'll remember the fact so that any blocking call is interrupted and any
  1317. // further attempt to use the socket will return ETERM. The user is still
  1318. // responsible for calling zmq_close on the socket though!
  1319. scoped_lock_t lock (_monitor_sync);
  1320. stop_monitor ();
  1321. _ctx_terminated = true;
  1322. }
  1323. void zmq::socket_base_t::process_bind (pipe_t *pipe_)
  1324. {
  1325. attach_pipe (pipe_);
  1326. }
  1327. void zmq::socket_base_t::process_term (int linger_)
  1328. {
  1329. // Unregister all inproc endpoints associated with this socket.
  1330. // Doing this we make sure that no new pipes from other sockets (inproc)
  1331. // will be initiated.
  1332. unregister_endpoints (this);
  1333. // Ask all attached pipes to terminate.
  1334. for (pipes_t::size_type i = 0, size = _pipes.size (); i != size; ++i) {
  1335. // Only inprocs might have a disconnect message set
  1336. _pipes[i]->send_disconnect_msg ();
  1337. _pipes[i]->terminate (false);
  1338. }
  1339. register_term_acks (static_cast<int> (_pipes.size ()));
  1340. // Continue the termination process immediately.
  1341. own_t::process_term (linger_);
  1342. }
  1343. void zmq::socket_base_t::process_term_endpoint (std::string *endpoint_)
  1344. {
  1345. term_endpoint (endpoint_->c_str ());
  1346. delete endpoint_;
  1347. }
  1348. void zmq::socket_base_t::process_pipe_stats_publish (
  1349. uint64_t outbound_queue_count_,
  1350. uint64_t inbound_queue_count_,
  1351. endpoint_uri_pair_t *endpoint_pair_)
  1352. {
  1353. uint64_t values[2] = {outbound_queue_count_, inbound_queue_count_};
  1354. event (*endpoint_pair_, values, 2, ZMQ_EVENT_PIPES_STATS);
  1355. delete endpoint_pair_;
  1356. }
  1357. /*
  1358. * There are 2 pipes per connection, and the inbound one _must_ be queried from
  1359. * the I/O thread. So ask the outbound pipe, in the application thread, to send
  1360. * a message (pipe_peer_stats) to its peer. The message will carry the outbound
  1361. * pipe stats and endpoint, and the reference to the socket object.
  1362. * The inbound pipe on the I/O thread will then add its own stats and endpoint,
  1363. * and write back a message to the socket object (pipe_stats_publish) which
  1364. * will raise an event with the data.
  1365. */
  1366. int zmq::socket_base_t::query_pipes_stats ()
  1367. {
  1368. {
  1369. scoped_lock_t lock (_monitor_sync);
  1370. if (!(_monitor_events & ZMQ_EVENT_PIPES_STATS)) {
  1371. errno = EINVAL;
  1372. return -1;
  1373. }
  1374. }
  1375. if (_pipes.size () == 0) {
  1376. errno = EAGAIN;
  1377. return -1;
  1378. }
  1379. for (pipes_t::size_type i = 0, size = _pipes.size (); i != size; ++i) {
  1380. _pipes[i]->send_stats_to_peer (this);
  1381. }
  1382. return 0;
  1383. }
  1384. void zmq::socket_base_t::update_pipe_options (int option_)
  1385. {
  1386. if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM) {
  1387. for (pipes_t::size_type i = 0, size = _pipes.size (); i != size; ++i) {
  1388. _pipes[i]->set_hwms (options.rcvhwm, options.sndhwm);
  1389. _pipes[i]->send_hwms_to_peer (options.sndhwm, options.rcvhwm);
  1390. }
  1391. }
  1392. }
  1393. void zmq::socket_base_t::process_destroy ()
  1394. {
  1395. _destroyed = true;
  1396. }
  1397. int zmq::socket_base_t::xsetsockopt (int, const void *, size_t)
  1398. {
  1399. errno = EINVAL;
  1400. return -1;
  1401. }
  1402. bool zmq::socket_base_t::xhas_out ()
  1403. {
  1404. return false;
  1405. }
  1406. int zmq::socket_base_t::xsend (msg_t *)
  1407. {
  1408. errno = ENOTSUP;
  1409. return -1;
  1410. }
  1411. bool zmq::socket_base_t::xhas_in ()
  1412. {
  1413. return false;
  1414. }
  1415. int zmq::socket_base_t::xjoin (const char *group_)
  1416. {
  1417. LIBZMQ_UNUSED (group_);
  1418. errno = ENOTSUP;
  1419. return -1;
  1420. }
  1421. int zmq::socket_base_t::xleave (const char *group_)
  1422. {
  1423. LIBZMQ_UNUSED (group_);
  1424. errno = ENOTSUP;
  1425. return -1;
  1426. }
  1427. int zmq::socket_base_t::xrecv (msg_t *)
  1428. {
  1429. errno = ENOTSUP;
  1430. return -1;
  1431. }
  1432. void zmq::socket_base_t::xread_activated (pipe_t *)
  1433. {
  1434. zmq_assert (false);
  1435. }
  1436. void zmq::socket_base_t::xwrite_activated (pipe_t *)
  1437. {
  1438. zmq_assert (false);
  1439. }
  1440. void zmq::socket_base_t::xhiccuped (pipe_t *)
  1441. {
  1442. zmq_assert (false);
  1443. }
  1444. void zmq::socket_base_t::in_event ()
  1445. {
  1446. // This function is invoked only once the socket is running in the context
  1447. // of the reaper thread. Process any commands from other threads/sockets
  1448. // that may be available at the moment. Ultimately, the socket will
  1449. // be destroyed.
  1450. {
  1451. scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
  1452. // If the socket is thread safe we need to unsignal the reaper signaler
  1453. if (_thread_safe)
  1454. _reaper_signaler->recv ();
  1455. process_commands (0, false);
  1456. }
  1457. check_destroy ();
  1458. }
  1459. void zmq::socket_base_t::out_event ()
  1460. {
  1461. zmq_assert (false);
  1462. }
  1463. void zmq::socket_base_t::timer_event (int)
  1464. {
  1465. zmq_assert (false);
  1466. }
  1467. void zmq::socket_base_t::check_destroy ()
  1468. {
  1469. // If the object was already marked as destroyed, finish the deallocation.
  1470. if (_destroyed) {
  1471. // Remove the socket from the reaper's poller.
  1472. _poller->rm_fd (_handle);
  1473. // Remove the socket from the context.
  1474. destroy_socket (this);
  1475. // Notify the reaper about the fact.
  1476. send_reaped ();
  1477. // Deallocate.
  1478. own_t::process_destroy ();
  1479. }
  1480. }
  1481. void zmq::socket_base_t::read_activated (pipe_t *pipe_)
  1482. {
  1483. xread_activated (pipe_);
  1484. }
  1485. void zmq::socket_base_t::write_activated (pipe_t *pipe_)
  1486. {
  1487. xwrite_activated (pipe_);
  1488. }
  1489. void zmq::socket_base_t::hiccuped (pipe_t *pipe_)
  1490. {
  1491. if (options.immediate == 1)
  1492. pipe_->terminate (false);
  1493. else
  1494. // Notify derived sockets of the hiccup
  1495. xhiccuped (pipe_);
  1496. }
  1497. void zmq::socket_base_t::pipe_terminated (pipe_t *pipe_)
  1498. {
  1499. // Notify the specific socket type about the pipe termination.
  1500. xpipe_terminated (pipe_);
  1501. // Remove pipe from inproc pipes
  1502. _inprocs.erase_pipe (pipe_);
  1503. // Remove the pipe from the list of attached pipes and confirm its
  1504. // termination if we are already shutting down.
  1505. _pipes.erase (pipe_);
  1506. // Remove the pipe from _endpoints (set it to NULL).
  1507. const std::string &identifier = pipe_->get_endpoint_pair ().identifier ();
  1508. if (!identifier.empty ()) {
  1509. std::pair<endpoints_t::iterator, endpoints_t::iterator> range;
  1510. range = _endpoints.equal_range (identifier);
  1511. for (endpoints_t::iterator it = range.first; it != range.second; ++it) {
  1512. if (it->second.second == pipe_) {
  1513. it->second.second = NULL;
  1514. break;
  1515. }
  1516. }
  1517. }
  1518. if (is_terminating ())
  1519. unregister_term_ack ();
  1520. }
  1521. void zmq::socket_base_t::extract_flags (const msg_t *msg_)
  1522. {
  1523. // Test whether routing_id flag is valid for this socket type.
  1524. if (unlikely (msg_->flags () & msg_t::routing_id))
  1525. zmq_assert (options.recv_routing_id);
  1526. // Remove MORE flag.
  1527. _rcvmore = (msg_->flags () & msg_t::more) != 0;
  1528. }
  1529. int zmq::socket_base_t::monitor (const char *endpoint_,
  1530. uint64_t events_,
  1531. int event_version_,
  1532. int type_)
  1533. {
  1534. scoped_lock_t lock (_monitor_sync);
  1535. if (unlikely (_ctx_terminated)) {
  1536. errno = ETERM;
  1537. return -1;
  1538. }
  1539. // Event version 1 supports only first 16 events.
  1540. if (unlikely (event_version_ == 1 && events_ >> 16 != 0)) {
  1541. errno = EINVAL;
  1542. return -1;
  1543. }
  1544. // Support deregistering monitoring endpoints as well
  1545. if (endpoint_ == NULL) {
  1546. stop_monitor ();
  1547. return 0;
  1548. }
  1549. // Parse endpoint_uri_ string.
  1550. std::string protocol;
  1551. std::string address;
  1552. if (parse_uri (endpoint_, protocol, address) || check_protocol (protocol))
  1553. return -1;
  1554. // Event notification only supported over inproc://
  1555. if (protocol != protocol_name::inproc) {
  1556. errno = EPROTONOSUPPORT;
  1557. return -1;
  1558. }
  1559. // already monitoring. Stop previous monitor before starting new one.
  1560. if (_monitor_socket != NULL) {
  1561. stop_monitor (true);
  1562. }
  1563. // Check if the specified socket type is supported. It must be a
  1564. // one-way socket types that support the SNDMORE flag.
  1565. switch (type_) {
  1566. case ZMQ_PAIR:
  1567. break;
  1568. case ZMQ_PUB:
  1569. break;
  1570. case ZMQ_PUSH:
  1571. break;
  1572. default:
  1573. errno = EINVAL;
  1574. return -1;
  1575. }
  1576. // Register events to monitor
  1577. _monitor_events = events_;
  1578. options.monitor_event_version = event_version_;
  1579. // Create a monitor socket of the specified type.
  1580. _monitor_socket = zmq_socket (get_ctx (), type_);
  1581. if (_monitor_socket == NULL)
  1582. return -1;
  1583. // Never block context termination on pending event messages
  1584. int linger = 0;
  1585. int rc =
  1586. zmq_setsockopt (_monitor_socket, ZMQ_LINGER, &linger, sizeof (linger));
  1587. if (rc == -1)
  1588. stop_monitor (false);
  1589. // Spawn the monitor socket endpoint
  1590. rc = zmq_bind (_monitor_socket, endpoint_);
  1591. if (rc == -1)
  1592. stop_monitor (false);
  1593. return rc;
  1594. }
  1595. void zmq::socket_base_t::event_connected (
  1596. const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
  1597. {
  1598. uint64_t values[1] = {static_cast<uint64_t> (fd_)};
  1599. event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CONNECTED);
  1600. }
  1601. void zmq::socket_base_t::event_connect_delayed (
  1602. const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
  1603. {
  1604. uint64_t values[1] = {static_cast<uint64_t> (err_)};
  1605. event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CONNECT_DELAYED);
  1606. }
  1607. void zmq::socket_base_t::event_connect_retried (
  1608. const endpoint_uri_pair_t &endpoint_uri_pair_, int interval_)
  1609. {
  1610. uint64_t values[1] = {static_cast<uint64_t> (interval_)};
  1611. event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CONNECT_RETRIED);
  1612. }
  1613. void zmq::socket_base_t::event_listening (
  1614. const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
  1615. {
  1616. uint64_t values[1] = {static_cast<uint64_t> (fd_)};
  1617. event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_LISTENING);
  1618. }
  1619. void zmq::socket_base_t::event_bind_failed (
  1620. const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
  1621. {
  1622. uint64_t values[1] = {static_cast<uint64_t> (err_)};
  1623. event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_BIND_FAILED);
  1624. }
  1625. void zmq::socket_base_t::event_accepted (
  1626. const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
  1627. {
  1628. uint64_t values[1] = {static_cast<uint64_t> (fd_)};
  1629. event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_ACCEPTED);
  1630. }
  1631. void zmq::socket_base_t::event_accept_failed (
  1632. const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
  1633. {
  1634. uint64_t values[1] = {static_cast<uint64_t> (err_)};
  1635. event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_ACCEPT_FAILED);
  1636. }
  1637. void zmq::socket_base_t::event_closed (
  1638. const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
  1639. {
  1640. uint64_t values[1] = {static_cast<uint64_t> (fd_)};
  1641. event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CLOSED);
  1642. }
  1643. void zmq::socket_base_t::event_close_failed (
  1644. const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
  1645. {
  1646. uint64_t values[1] = {static_cast<uint64_t> (err_)};
  1647. event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CLOSE_FAILED);
  1648. }
  1649. void zmq::socket_base_t::event_disconnected (
  1650. const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
  1651. {
  1652. uint64_t values[1] = {static_cast<uint64_t> (fd_)};
  1653. event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_DISCONNECTED);
  1654. }
  1655. void zmq::socket_base_t::event_handshake_failed_no_detail (
  1656. const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
  1657. {
  1658. uint64_t values[1] = {static_cast<uint64_t> (err_)};
  1659. event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL);
  1660. }
  1661. void zmq::socket_base_t::event_handshake_failed_protocol (
  1662. const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
  1663. {
  1664. uint64_t values[1] = {static_cast<uint64_t> (err_)};
  1665. event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL);
  1666. }
  1667. void zmq::socket_base_t::event_handshake_failed_auth (
  1668. const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
  1669. {
  1670. uint64_t values[1] = {static_cast<uint64_t> (err_)};
  1671. event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_HANDSHAKE_FAILED_AUTH);
  1672. }
  1673. void zmq::socket_base_t::event_handshake_succeeded (
  1674. const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
  1675. {
  1676. uint64_t values[1] = {static_cast<uint64_t> (err_)};
  1677. event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_HANDSHAKE_SUCCEEDED);
  1678. }
  1679. void zmq::socket_base_t::event (const endpoint_uri_pair_t &endpoint_uri_pair_,
  1680. uint64_t values_[],
  1681. uint64_t values_count_,
  1682. uint64_t type_)
  1683. {
  1684. scoped_lock_t lock (_monitor_sync);
  1685. if (_monitor_events & type_) {
  1686. monitor_event (type_, values_, values_count_, endpoint_uri_pair_);
  1687. }
  1688. }
  1689. // Send a monitor event
  1690. void zmq::socket_base_t::monitor_event (
  1691. uint64_t event_,
  1692. const uint64_t values_[],
  1693. uint64_t values_count_,
  1694. const endpoint_uri_pair_t &endpoint_uri_pair_) const
  1695. {
  1696. // this is a private method which is only called from
  1697. // contexts where the _monitor_sync mutex has been locked before
  1698. if (_monitor_socket) {
  1699. zmq_msg_t msg;
  1700. switch (options.monitor_event_version) {
  1701. case 1: {
  1702. // The API should not allow to activate unsupported events
  1703. zmq_assert (event_ <= std::numeric_limits<uint16_t>::max ());
  1704. // v1 only allows one value
  1705. zmq_assert (values_count_ == 1);
  1706. zmq_assert (values_[0]
  1707. <= std::numeric_limits<uint32_t>::max ());
  1708. // Send event and value in first frame
  1709. const uint16_t event = static_cast<uint16_t> (event_);
  1710. const uint32_t value = static_cast<uint32_t> (values_[0]);
  1711. zmq_msg_init_size (&msg, sizeof (event) + sizeof (value));
  1712. uint8_t *data = static_cast<uint8_t *> (zmq_msg_data (&msg));
  1713. // Avoid dereferencing uint32_t on unaligned address
  1714. memcpy (data + 0, &event, sizeof (event));
  1715. memcpy (data + sizeof (event), &value, sizeof (value));
  1716. zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
  1717. const std::string &endpoint_uri =
  1718. endpoint_uri_pair_.identifier ();
  1719. // Send address in second frame
  1720. zmq_msg_init_size (&msg, endpoint_uri.size ());
  1721. memcpy (zmq_msg_data (&msg), endpoint_uri.c_str (),
  1722. endpoint_uri.size ());
  1723. zmq_msg_send (&msg, _monitor_socket, 0);
  1724. } break;
  1725. case 2: {
  1726. // Send event in first frame (64bit unsigned)
  1727. zmq_msg_init_size (&msg, sizeof (event_));
  1728. memcpy (zmq_msg_data (&msg), &event_, sizeof (event_));
  1729. zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
  1730. // Send number of values that will follow in second frame
  1731. zmq_msg_init_size (&msg, sizeof (values_count_));
  1732. memcpy (zmq_msg_data (&msg), &values_count_,
  1733. sizeof (values_count_));
  1734. zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
  1735. // Send values in third-Nth frames (64bit unsigned)
  1736. for (uint64_t i = 0; i < values_count_; ++i) {
  1737. zmq_msg_init_size (&msg, sizeof (values_[i]));
  1738. memcpy (zmq_msg_data (&msg), &values_[i],
  1739. sizeof (values_[i]));
  1740. zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
  1741. }
  1742. // Send local endpoint URI in second-to-last frame (string)
  1743. zmq_msg_init_size (&msg, endpoint_uri_pair_.local.size ());
  1744. memcpy (zmq_msg_data (&msg), endpoint_uri_pair_.local.c_str (),
  1745. endpoint_uri_pair_.local.size ());
  1746. zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
  1747. // Send remote endpoint URI in last frame (string)
  1748. zmq_msg_init_size (&msg, endpoint_uri_pair_.remote.size ());
  1749. memcpy (zmq_msg_data (&msg), endpoint_uri_pair_.remote.c_str (),
  1750. endpoint_uri_pair_.remote.size ());
  1751. zmq_msg_send (&msg, _monitor_socket, 0);
  1752. } break;
  1753. }
  1754. }
  1755. }
  1756. void zmq::socket_base_t::stop_monitor (bool send_monitor_stopped_event_)
  1757. {
  1758. // this is a private method which is only called from
  1759. // contexts where the _monitor_sync mutex has been locked before
  1760. if (_monitor_socket) {
  1761. if ((_monitor_events & ZMQ_EVENT_MONITOR_STOPPED)
  1762. && send_monitor_stopped_event_) {
  1763. uint64_t values[1] = {0};
  1764. monitor_event (ZMQ_EVENT_MONITOR_STOPPED, values, 1,
  1765. endpoint_uri_pair_t ());
  1766. }
  1767. zmq_close (_monitor_socket);
  1768. _monitor_socket = NULL;
  1769. _monitor_events = 0;
  1770. }
  1771. }
  1772. zmq::routing_socket_base_t::routing_socket_base_t (class ctx_t *parent_,
  1773. uint32_t tid_,
  1774. int sid_) :
  1775. socket_base_t (parent_, tid_, sid_)
  1776. {
  1777. }
  1778. zmq::routing_socket_base_t::~routing_socket_base_t ()
  1779. {
  1780. zmq_assert (_out_pipes.empty ());
  1781. }
  1782. int zmq::routing_socket_base_t::xsetsockopt (int option_,
  1783. const void *optval_,
  1784. size_t optvallen_)
  1785. {
  1786. switch (option_) {
  1787. case ZMQ_CONNECT_ROUTING_ID:
  1788. // TODO why isn't it possible to set an empty connect_routing_id
  1789. // (which is the default value)
  1790. if (optval_ && optvallen_) {
  1791. _connect_routing_id.assign (static_cast<const char *> (optval_),
  1792. optvallen_);
  1793. return 0;
  1794. }
  1795. break;
  1796. }
  1797. errno = EINVAL;
  1798. return -1;
  1799. }
  1800. void zmq::routing_socket_base_t::xwrite_activated (pipe_t *pipe_)
  1801. {
  1802. const out_pipes_t::iterator end = _out_pipes.end ();
  1803. out_pipes_t::iterator it;
  1804. for (it = _out_pipes.begin (); it != end; ++it)
  1805. if (it->second.pipe == pipe_)
  1806. break;
  1807. zmq_assert (it != end);
  1808. zmq_assert (!it->second.active);
  1809. it->second.active = true;
  1810. }
  1811. std::string zmq::routing_socket_base_t::extract_connect_routing_id ()
  1812. {
  1813. std::string res = ZMQ_MOVE (_connect_routing_id);
  1814. _connect_routing_id.clear ();
  1815. return res;
  1816. }
  1817. bool zmq::routing_socket_base_t::connect_routing_id_is_set () const
  1818. {
  1819. return !_connect_routing_id.empty ();
  1820. }
  1821. void zmq::routing_socket_base_t::add_out_pipe (blob_t routing_id_,
  1822. pipe_t *pipe_)
  1823. {
  1824. // Add the record into output pipes lookup table
  1825. const out_pipe_t outpipe = {pipe_, true};
  1826. const bool ok =
  1827. _out_pipes.ZMQ_MAP_INSERT_OR_EMPLACE (ZMQ_MOVE (routing_id_), outpipe)
  1828. .second;
  1829. zmq_assert (ok);
  1830. }
  1831. bool zmq::routing_socket_base_t::has_out_pipe (const blob_t &routing_id_) const
  1832. {
  1833. return 0 != _out_pipes.count (routing_id_);
  1834. }
  1835. zmq::routing_socket_base_t::out_pipe_t *
  1836. zmq::routing_socket_base_t::lookup_out_pipe (const blob_t &routing_id_)
  1837. {
  1838. // TODO we could probably avoid constructor a temporary blob_t to call this function
  1839. out_pipes_t::iterator it = _out_pipes.find (routing_id_);
  1840. return it == _out_pipes.end () ? NULL : &it->second;
  1841. }
  1842. const zmq::routing_socket_base_t::out_pipe_t *
  1843. zmq::routing_socket_base_t::lookup_out_pipe (const blob_t &routing_id_) const
  1844. {
  1845. // TODO we could probably avoid constructor a temporary blob_t to call this function
  1846. const out_pipes_t::const_iterator it = _out_pipes.find (routing_id_);
  1847. return it == _out_pipes.end () ? NULL : &it->second;
  1848. }
  1849. void zmq::routing_socket_base_t::erase_out_pipe (const pipe_t *pipe_)
  1850. {
  1851. const size_t erased = _out_pipes.erase (pipe_->get_routing_id ());
  1852. zmq_assert (erased);
  1853. }
  1854. zmq::routing_socket_base_t::out_pipe_t
  1855. zmq::routing_socket_base_t::try_erase_out_pipe (const blob_t &routing_id_)
  1856. {
  1857. const out_pipes_t::iterator it = _out_pipes.find (routing_id_);
  1858. out_pipe_t res = {NULL, false};
  1859. if (it != _out_pipes.end ()) {
  1860. res = it->second;
  1861. _out_pipes.erase (it);
  1862. }
  1863. return res;
  1864. }