norm_engine.cpp 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721
  1. #include "precompiled.hpp"
  2. #include "platform.hpp"
  3. #if defined ZMQ_HAVE_NORM
  4. #include "norm_engine.hpp"
  5. #include "session_base.hpp"
  6. #include "v2_protocol.hpp"
  7. zmq::norm_engine_t::norm_engine_t (io_thread_t *parent_,
  8. const options_t &options_) :
  9. io_object_t (parent_),
  10. zmq_session (NULL),
  11. options (options_),
  12. norm_instance (NORM_INSTANCE_INVALID),
  13. norm_session (NORM_SESSION_INVALID),
  14. is_sender (false),
  15. is_receiver (false),
  16. zmq_encoder (0),
  17. norm_tx_stream (NORM_OBJECT_INVALID),
  18. tx_first_msg (true),
  19. tx_more_bit (false),
  20. zmq_output_ready (false),
  21. norm_tx_ready (false),
  22. tx_index (0),
  23. tx_len (0),
  24. zmq_input_ready (false)
  25. {
  26. int rc = tx_msg.init ();
  27. errno_assert (0 == rc);
  28. }
  29. zmq::norm_engine_t::~norm_engine_t ()
  30. {
  31. shutdown (); // in case it was not already called
  32. }
  33. int zmq::norm_engine_t::init (const char *network_, bool send, bool recv)
  34. {
  35. // Parse the "network_" address int "iface", "addr", and "port"
  36. // norm endpoint format: [id,][<iface>;]<addr>:<port>
  37. // First, look for optional local NormNodeId
  38. // (default NORM_NODE_ANY causes NORM to use host IP addr for NormNodeId)
  39. NormNodeId localId = NORM_NODE_ANY;
  40. const char *ifacePtr = strchr (network_, ',');
  41. if (NULL != ifacePtr) {
  42. size_t idLen = ifacePtr - network_;
  43. if (idLen > 31)
  44. idLen = 31;
  45. char idText[32];
  46. strncpy (idText, network_, idLen);
  47. idText[idLen] = '\0';
  48. localId = (NormNodeId) atoi (idText);
  49. ifacePtr++;
  50. } else {
  51. ifacePtr = network_;
  52. }
  53. // Second, look for optional multicast ifaceName
  54. char ifaceName[256];
  55. const char *addrPtr = strchr (ifacePtr, ';');
  56. if (NULL != addrPtr) {
  57. size_t ifaceLen = addrPtr - ifacePtr;
  58. if (ifaceLen > 255)
  59. ifaceLen = 255; // return error instead?
  60. strncpy (ifaceName, ifacePtr, ifaceLen);
  61. ifaceName[ifaceLen] = '\0';
  62. ifacePtr = ifaceName;
  63. addrPtr++;
  64. } else {
  65. addrPtr = ifacePtr;
  66. ifacePtr = NULL;
  67. }
  68. // Finally, parse IP address and port number
  69. const char *portPtr = strrchr (addrPtr, ':');
  70. if (NULL == portPtr) {
  71. errno = EINVAL;
  72. return -1;
  73. }
  74. char addr[256];
  75. size_t addrLen = portPtr - addrPtr;
  76. if (addrLen > 255)
  77. addrLen = 255;
  78. strncpy (addr, addrPtr, addrLen);
  79. addr[addrLen] = '\0';
  80. portPtr++;
  81. unsigned short portNumber = atoi (portPtr);
  82. if (NORM_INSTANCE_INVALID == norm_instance) {
  83. if (NORM_INSTANCE_INVALID == (norm_instance = NormCreateInstance ())) {
  84. // errno set by whatever caused NormCreateInstance() to fail
  85. return -1;
  86. }
  87. }
  88. // TBD - What do we use for our local NormNodeId?
  89. // (for now we use automatic, IP addr based assignment or passed in 'id')
  90. // a) Use ZMQ Identity somehow?
  91. // b) Add function to use iface addr
  92. // c) Randomize and implement a NORM session layer
  93. // conflict detection/resolution protocol
  94. norm_session = NormCreateSession (norm_instance, addr, portNumber, localId);
  95. if (NORM_SESSION_INVALID == norm_session) {
  96. int savedErrno = errno;
  97. NormDestroyInstance (norm_instance);
  98. norm_instance = NORM_INSTANCE_INVALID;
  99. errno = savedErrno;
  100. return -1;
  101. }
  102. // There's many other useful NORM options that could be applied here
  103. if (NormIsUnicastAddress (addr)) {
  104. NormSetDefaultUnicastNack (norm_session, true);
  105. } else {
  106. // These only apply for multicast sessions
  107. //NormSetTTL(norm_session, options.multicast_hops); // ZMQ default is 1
  108. NormSetTTL (
  109. norm_session,
  110. 255); // since the ZMQ_MULTICAST_HOPS socket option isn't well-supported
  111. NormSetRxPortReuse (
  112. norm_session,
  113. true); // port reuse doesn't work for non-connected unicast
  114. NormSetLoopback (norm_session,
  115. true); // needed when multicast users on same machine
  116. if (NULL != ifacePtr) {
  117. // Note a bad interface may not be caught until sender or receiver start
  118. // (Since sender/receiver is not yet started, this always succeeds here)
  119. NormSetMulticastInterface (norm_session, ifacePtr);
  120. }
  121. }
  122. if (recv) {
  123. // The alternative NORM_SYNC_CURRENT here would provide "instant"
  124. // receiver sync to the sender's _current_ message transmission.
  125. // NORM_SYNC_STREAM tries to get everything the sender has cached/buffered
  126. NormSetDefaultSyncPolicy (norm_session, NORM_SYNC_STREAM);
  127. if (!NormStartReceiver (norm_session, 2 * 1024 * 1024)) {
  128. // errno set by whatever failed
  129. int savedErrno = errno;
  130. NormDestroyInstance (norm_instance); // session gets closed, too
  131. norm_session = NORM_SESSION_INVALID;
  132. norm_instance = NORM_INSTANCE_INVALID;
  133. errno = savedErrno;
  134. return -1;
  135. }
  136. is_receiver = true;
  137. }
  138. if (send) {
  139. // Pick a random sender instance id (aka norm sender session id)
  140. NormSessionId instanceId = NormGetRandomSessionId ();
  141. // TBD - provide "options" for some NORM sender parameters
  142. if (!NormStartSender (norm_session, instanceId, 2 * 1024 * 1024, 1400,
  143. 16, 4)) {
  144. // errno set by whatever failed
  145. int savedErrno = errno;
  146. NormDestroyInstance (norm_instance); // session gets closed, too
  147. norm_session = NORM_SESSION_INVALID;
  148. norm_instance = NORM_INSTANCE_INVALID;
  149. errno = savedErrno;
  150. return -1;
  151. }
  152. NormSetCongestionControl (norm_session, true);
  153. norm_tx_ready = true;
  154. is_sender = true;
  155. if (NORM_OBJECT_INVALID
  156. == (norm_tx_stream =
  157. NormStreamOpen (norm_session, 2 * 1024 * 1024))) {
  158. // errno set by whatever failed
  159. int savedErrno = errno;
  160. NormDestroyInstance (norm_instance); // session gets closed, too
  161. norm_session = NORM_SESSION_INVALID;
  162. norm_instance = NORM_INSTANCE_INVALID;
  163. errno = savedErrno;
  164. return -1;
  165. }
  166. }
  167. //NormSetMessageTrace(norm_session, true);
  168. //NormSetDebugLevel(3);
  169. //NormOpenDebugLog(norm_instance, "normLog.txt");
  170. return 0; // no error
  171. } // end zmq::norm_engine_t::init()
  172. void zmq::norm_engine_t::shutdown ()
  173. {
  174. // TBD - implement a more graceful shutdown option
  175. if (is_receiver) {
  176. NormStopReceiver (norm_session);
  177. // delete any active NormRxStreamState
  178. rx_pending_list.Destroy ();
  179. rx_ready_list.Destroy ();
  180. msg_ready_list.Destroy ();
  181. is_receiver = false;
  182. }
  183. if (is_sender) {
  184. NormStopSender (norm_session);
  185. is_sender = false;
  186. }
  187. if (NORM_SESSION_INVALID != norm_session) {
  188. NormDestroySession (norm_session);
  189. norm_session = NORM_SESSION_INVALID;
  190. }
  191. if (NORM_INSTANCE_INVALID != norm_instance) {
  192. NormStopInstance (norm_instance);
  193. NormDestroyInstance (norm_instance);
  194. norm_instance = NORM_INSTANCE_INVALID;
  195. }
  196. } // end zmq::norm_engine_t::shutdown()
  197. void zmq::norm_engine_t::plug (io_thread_t *io_thread_,
  198. session_base_t *session_)
  199. {
  200. // TBD - we may assign the NORM engine to an io_thread in the future???
  201. zmq_session = session_;
  202. if (is_sender)
  203. zmq_output_ready = true;
  204. if (is_receiver)
  205. zmq_input_ready = true;
  206. fd_t normDescriptor = NormGetDescriptor (norm_instance);
  207. norm_descriptor_handle = add_fd (normDescriptor);
  208. // Set POLLIN for notification of pending NormEvents
  209. set_pollin (norm_descriptor_handle);
  210. if (is_sender)
  211. send_data ();
  212. } // end zmq::norm_engine_t::init()
  213. void zmq::norm_engine_t::unplug ()
  214. {
  215. rm_fd (norm_descriptor_handle);
  216. zmq_session = NULL;
  217. } // end zmq::norm_engine_t::unplug()
  218. void zmq::norm_engine_t::terminate ()
  219. {
  220. unplug ();
  221. shutdown ();
  222. delete this;
  223. }
  224. void zmq::norm_engine_t::restart_output ()
  225. {
  226. // There's new message data available from the session
  227. zmq_output_ready = true;
  228. if (norm_tx_ready)
  229. send_data ();
  230. } // end zmq::norm_engine_t::restart_output()
  231. void zmq::norm_engine_t::send_data ()
  232. {
  233. // Here we write as much as is available or we can
  234. while (zmq_output_ready && norm_tx_ready) {
  235. if (0 == tx_len) {
  236. // Our tx_buffer needs data to send
  237. // Get more data from encoder
  238. size_t space = BUFFER_SIZE;
  239. unsigned char *bufPtr = (unsigned char *) tx_buffer;
  240. tx_len = zmq_encoder.encode (&bufPtr, space);
  241. if (0 == tx_len) {
  242. if (tx_first_msg) {
  243. // We don't need to mark eom/flush until a message is sent
  244. tx_first_msg = false;
  245. } else {
  246. // A prior message was completely written to stream, so
  247. // mark end-of-message and possibly flush (to force packet transmission,
  248. // even if it's not a full segment so message gets delivered quickly)
  249. // NormStreamMarkEom(norm_tx_stream); // the flush below marks eom
  250. // Note NORM_FLUSH_ACTIVE makes NORM fairly chatty for low duty cycle messaging
  251. // but makes sure content is delivered quickly. Positive acknowledgements
  252. // with flush override would make NORM more succinct here
  253. NormStreamFlush (norm_tx_stream, true, NORM_FLUSH_ACTIVE);
  254. }
  255. // Need to pull and load a new message to send
  256. if (-1 == zmq_session->pull_msg (&tx_msg)) {
  257. // We need to wait for "restart_output()" to be called by ZMQ
  258. zmq_output_ready = false;
  259. break;
  260. }
  261. zmq_encoder.load_msg (&tx_msg);
  262. // Should we write message size header for NORM to use? Or expect NORM
  263. // receiver to decode ZMQ message framing format(s)?
  264. // OK - we need to use a byte to denote when the ZMQ frame is the _first_
  265. // frame of a message so it can be decoded properly when a receiver
  266. // 'syncs' mid-stream. We key off the the state of the 'more_flag'
  267. // I.e.,If more_flag _was_ false previously, this is the first
  268. // frame of a ZMQ message.
  269. if (tx_more_bit)
  270. tx_buffer[0] =
  271. (char) 0xff; // this is not first frame of message
  272. else
  273. tx_buffer[0] = 0x00; // this is first frame of message
  274. tx_more_bit = (0 != (tx_msg.flags () & msg_t::more));
  275. // Go ahead an get a first chunk of the message
  276. bufPtr++;
  277. space--;
  278. tx_len = 1 + zmq_encoder.encode (&bufPtr, space);
  279. tx_index = 0;
  280. }
  281. }
  282. // Do we have data in our tx_buffer pending
  283. if (tx_index < tx_len) {
  284. // We have data in our tx_buffer to send, so write it to the stream
  285. tx_index += NormStreamWrite (norm_tx_stream, tx_buffer + tx_index,
  286. tx_len - tx_index);
  287. if (tx_index < tx_len) {
  288. // NORM stream buffer full, wait for NORM_TX_QUEUE_VACANCY
  289. norm_tx_ready = false;
  290. break;
  291. }
  292. tx_len = 0; // all buffered data was written
  293. }
  294. } // end while (zmq_output_ready && norm_tx_ready)
  295. } // end zmq::norm_engine_t::send_data()
  296. void zmq::norm_engine_t::in_event ()
  297. {
  298. // This means a NormEvent is pending, so call NormGetNextEvent() and handle
  299. NormEvent event;
  300. if (!NormGetNextEvent (norm_instance, &event)) {
  301. // NORM has died before we unplugged?!
  302. zmq_assert (false);
  303. return;
  304. }
  305. switch (event.type) {
  306. case NORM_TX_QUEUE_VACANCY:
  307. case NORM_TX_QUEUE_EMPTY:
  308. if (!norm_tx_ready) {
  309. norm_tx_ready = true;
  310. send_data ();
  311. }
  312. break;
  313. case NORM_RX_OBJECT_NEW:
  314. //break;
  315. case NORM_RX_OBJECT_UPDATED:
  316. recv_data (event.object);
  317. break;
  318. case NORM_RX_OBJECT_ABORTED: {
  319. NormRxStreamState *rxState =
  320. (NormRxStreamState *) NormObjectGetUserData (event.object);
  321. if (NULL != rxState) {
  322. // Remove the state from the list it's in
  323. // This is now unnecessary since deletion takes care of list removal
  324. // but in the interest of being clear ...
  325. NormRxStreamState::List *list = rxState->AccessList ();
  326. if (NULL != list)
  327. list->Remove (*rxState);
  328. }
  329. delete rxState;
  330. break;
  331. }
  332. case NORM_REMOTE_SENDER_INACTIVE:
  333. // Here we free resources used for this formerly active sender.
  334. // Note w/ NORM_SYNC_STREAM, if sender reactivates, we may
  335. // get some messages delivered twice. NORM_SYNC_CURRENT would
  336. // mitigate that but might miss data at startup. Always tradeoffs.
  337. // Instead of immediately deleting, we could instead initiate a
  338. // user configurable timeout here to wait some amount of time
  339. // after this event to declare the remote sender truly dead
  340. // and delete its state???
  341. NormNodeDelete (event.sender);
  342. break;
  343. default:
  344. // We ignore some NORM events
  345. break;
  346. }
  347. } // zmq::norm_engine_t::in_event()
  348. bool zmq::norm_engine_t::restart_input ()
  349. {
  350. // TBD - should we check/assert that zmq_input_ready was false???
  351. zmq_input_ready = true;
  352. // Process any pending received messages
  353. if (!msg_ready_list.IsEmpty ())
  354. recv_data (NORM_OBJECT_INVALID);
  355. return true;
  356. } // end zmq::norm_engine_t::restart_input()
  357. void zmq::norm_engine_t::recv_data (NormObjectHandle object)
  358. {
  359. if (NORM_OBJECT_INVALID != object) {
  360. // Call result of NORM_RX_OBJECT_UPDATED notification
  361. // This is a rx_ready indication for a new or existing rx stream
  362. // First, determine if this is a stream we already know
  363. zmq_assert (NORM_OBJECT_STREAM == NormObjectGetType (object));
  364. // Since there can be multiple senders (publishers), we keep
  365. // state for each separate rx stream.
  366. NormRxStreamState *rxState =
  367. (NormRxStreamState *) NormObjectGetUserData (object);
  368. if (NULL == rxState) {
  369. // This is a new stream, so create rxState with zmq decoder, etc
  370. rxState = new (std::nothrow)
  371. NormRxStreamState (object, options.maxmsgsize, options.zero_copy,
  372. options.in_batch_size);
  373. errno_assert (rxState);
  374. if (!rxState->Init ()) {
  375. errno_assert (false);
  376. delete rxState;
  377. return;
  378. }
  379. NormObjectSetUserData (object, rxState);
  380. } else if (!rxState->IsRxReady ()) {
  381. // Existing non-ready stream, so remove from pending
  382. // list to be promoted to rx_ready_list ...
  383. rx_pending_list.Remove (*rxState);
  384. }
  385. if (!rxState->IsRxReady ()) {
  386. // TBD - prepend up front for immediate service?
  387. rxState->SetRxReady (true);
  388. rx_ready_list.Append (*rxState);
  389. }
  390. }
  391. // This loop repeats until we've read all data available from "rx ready" inbound streams
  392. // and pushed any accumulated messages we can up to the zmq session.
  393. while (!rx_ready_list.IsEmpty ()
  394. || (zmq_input_ready && !msg_ready_list.IsEmpty ())) {
  395. // Iterate through our rx_ready streams, reading data into the decoder
  396. // (This services incoming "rx ready" streams in a round-robin fashion)
  397. NormRxStreamState::List::Iterator iterator (rx_ready_list);
  398. NormRxStreamState *rxState;
  399. while (NULL != (rxState = iterator.GetNextItem ())) {
  400. switch (rxState->Decode ()) {
  401. case 1: // msg completed
  402. // Complete message decoded, move this stream to msg_ready_list
  403. // to push the message up to the session below. Note the stream
  404. // will be returned to the "rx_ready_list" after that's done
  405. rx_ready_list.Remove (*rxState);
  406. msg_ready_list.Append (*rxState);
  407. continue;
  408. case -1: // decoding error (shouldn't happen w/ NORM, but ...)
  409. // We need to re-sync this stream (decoder buffer was reset)
  410. rxState->SetSync (false);
  411. break;
  412. default: // 0 - need more data
  413. break;
  414. }
  415. // Get more data from this stream
  416. NormObjectHandle stream = rxState->GetStreamHandle ();
  417. // First, make sure we're in sync ...
  418. while (!rxState->InSync ()) {
  419. // seek NORM message start
  420. if (!NormStreamSeekMsgStart (stream)) {
  421. // Need to wait for more data
  422. break;
  423. }
  424. // read message 'flag' byte to see if this it's a 'final' frame
  425. char syncFlag;
  426. unsigned int numBytes = 1;
  427. if (!NormStreamRead (stream, &syncFlag, &numBytes)) {
  428. // broken stream (shouldn't happen after seek msg start?)
  429. zmq_assert (false);
  430. continue;
  431. }
  432. if (0 == numBytes) {
  433. // This probably shouldn't happen either since we found msg start
  434. // Need to wait for more data
  435. break;
  436. }
  437. if (0 == syncFlag)
  438. rxState->SetSync (true);
  439. // else keep seeking ...
  440. } // end while(!rxState->InSync())
  441. if (!rxState->InSync ()) {
  442. // Need more data for this stream, so remove from "rx ready"
  443. // list and iterate to next "rx ready" stream
  444. rxState->SetRxReady (false);
  445. // Move from rx_ready_list to rx_pending_list
  446. rx_ready_list.Remove (*rxState);
  447. rx_pending_list.Append (*rxState);
  448. continue;
  449. }
  450. // Now we're actually ready to read data from the NORM stream to the zmq_decoder
  451. // the underlying zmq_decoder->get_buffer() call sets how much is needed.
  452. unsigned int numBytes = rxState->GetBytesNeeded ();
  453. if (!NormStreamRead (stream, rxState->AccessBuffer (), &numBytes)) {
  454. // broken NORM stream, so re-sync
  455. rxState->Init (); // TBD - check result
  456. // This will retry syncing, and getting data from this stream
  457. // since we don't increment the "it" iterator
  458. continue;
  459. }
  460. rxState->IncrementBufferCount (numBytes);
  461. if (0 == numBytes) {
  462. // All the data available has been read
  463. // Need to wait for NORM_RX_OBJECT_UPDATED for this stream
  464. rxState->SetRxReady (false);
  465. // Move from rx_ready_list to rx_pending_list
  466. rx_ready_list.Remove (*rxState);
  467. rx_pending_list.Append (*rxState);
  468. }
  469. } // end while(NULL != (rxState = iterator.GetNextItem()))
  470. if (zmq_input_ready) {
  471. // At this point, we've made a pass through the "rx_ready" stream list
  472. // Now make a pass through the "msg_pending" list (if the zmq session
  473. // ready for more input). This may possibly return streams back to
  474. // the "rx ready" stream list after their pending message is handled
  475. NormRxStreamState::List::Iterator iterator (msg_ready_list);
  476. NormRxStreamState *rxState;
  477. while (NULL != (rxState = iterator.GetNextItem ())) {
  478. msg_t *msg = rxState->AccessMsg ();
  479. int rc = zmq_session->push_msg (msg);
  480. if (-1 == rc) {
  481. if (EAGAIN == errno) {
  482. // need to wait until session calls "restart_input()"
  483. zmq_input_ready = false;
  484. break;
  485. } else {
  486. // session rejected message?
  487. // TBD - handle this better
  488. zmq_assert (false);
  489. }
  490. }
  491. // else message was accepted.
  492. msg_ready_list.Remove (*rxState);
  493. if (
  494. rxState
  495. ->IsRxReady ()) // Move back to "rx_ready" list to read more data
  496. rx_ready_list.Append (*rxState);
  497. else // Move back to "rx_pending" list until NORM_RX_OBJECT_UPDATED
  498. msg_ready_list.Append (*rxState);
  499. } // end while(NULL != (rxState = iterator.GetNextItem()))
  500. } // end if (zmq_input_ready)
  501. } // end while ((!rx_ready_list.empty() || (zmq_input_ready && !msg_ready_list.empty()))
  502. // Alert zmq of the messages we have pushed up
  503. zmq_session->flush ();
  504. } // end zmq::norm_engine_t::recv_data()
  505. zmq::norm_engine_t::NormRxStreamState::NormRxStreamState (
  506. NormObjectHandle normStream,
  507. int64_t maxMsgSize,
  508. bool zeroCopy,
  509. int inBatchSize) :
  510. norm_stream (normStream),
  511. max_msg_size (maxMsgSize),
  512. zero_copy (zeroCopy),
  513. in_batch_size (inBatchSize),
  514. in_sync (false),
  515. rx_ready (false),
  516. zmq_decoder (NULL),
  517. skip_norm_sync (false),
  518. buffer_ptr (NULL),
  519. buffer_size (0),
  520. buffer_count (0),
  521. prev (NULL),
  522. next (NULL),
  523. list (NULL)
  524. {
  525. }
  526. zmq::norm_engine_t::NormRxStreamState::~NormRxStreamState ()
  527. {
  528. if (NULL != zmq_decoder) {
  529. delete zmq_decoder;
  530. zmq_decoder = NULL;
  531. }
  532. if (NULL != list) {
  533. list->Remove (*this);
  534. list = NULL;
  535. }
  536. }
  537. bool zmq::norm_engine_t::NormRxStreamState::Init ()
  538. {
  539. in_sync = false;
  540. skip_norm_sync = false;
  541. if (NULL != zmq_decoder)
  542. delete zmq_decoder;
  543. zmq_decoder =
  544. new (std::nothrow) v2_decoder_t (in_batch_size, max_msg_size, zero_copy);
  545. alloc_assert (zmq_decoder);
  546. if (NULL != zmq_decoder) {
  547. buffer_count = 0;
  548. buffer_size = 0;
  549. zmq_decoder->get_buffer (&buffer_ptr, &buffer_size);
  550. return true;
  551. } else {
  552. return false;
  553. }
  554. } // end zmq::norm_engine_t::NormRxStreamState::Init()
  555. // This decodes any pending data sitting in our stream decoder buffer
  556. // It returns 1 upon message completion, -1 on error, 1 on msg completion
  557. int zmq::norm_engine_t::NormRxStreamState::Decode ()
  558. {
  559. // If we have pending bytes to decode, process those first
  560. while (buffer_count > 0) {
  561. // There's pending data for the decoder to decode
  562. size_t processed = 0;
  563. // This a bit of a kludgy approach used to weed
  564. // out the NORM ZMQ message transport "syncFlag" byte
  565. // from the ZMQ message stream being decoded (but it works!)
  566. if (skip_norm_sync) {
  567. buffer_ptr++;
  568. buffer_count--;
  569. skip_norm_sync = false;
  570. }
  571. int rc = zmq_decoder->decode (buffer_ptr, buffer_count, processed);
  572. buffer_ptr += processed;
  573. buffer_count -= processed;
  574. switch (rc) {
  575. case 1:
  576. // msg completed
  577. if (0 == buffer_count) {
  578. buffer_size = 0;
  579. zmq_decoder->get_buffer (&buffer_ptr, &buffer_size);
  580. }
  581. skip_norm_sync = true;
  582. return 1;
  583. case -1:
  584. // decoder error (reset decoder and state variables)
  585. in_sync = false;
  586. skip_norm_sync = false; // will get consumed by norm sync check
  587. Init ();
  588. break;
  589. case 0:
  590. // need more data, keep decoding until buffer exhausted
  591. break;
  592. }
  593. }
  594. // Reset buffer pointer/count for next read
  595. buffer_count = 0;
  596. buffer_size = 0;
  597. zmq_decoder->get_buffer (&buffer_ptr, &buffer_size);
  598. return 0; // need more data
  599. } // end zmq::norm_engine_t::NormRxStreamState::Decode()
  600. zmq::norm_engine_t::NormRxStreamState::List::List () : head (NULL), tail (NULL)
  601. {
  602. }
  603. zmq::norm_engine_t::NormRxStreamState::List::~List ()
  604. {
  605. Destroy ();
  606. }
  607. void zmq::norm_engine_t::NormRxStreamState::List::Destroy ()
  608. {
  609. NormRxStreamState *item = head;
  610. while (NULL != item) {
  611. Remove (*item);
  612. delete item;
  613. item = head;
  614. }
  615. } // end zmq::norm_engine_t::NormRxStreamState::List::Destroy()
  616. void zmq::norm_engine_t::NormRxStreamState::List::Append (
  617. NormRxStreamState &item)
  618. {
  619. item.prev = tail;
  620. if (NULL != tail)
  621. tail->next = &item;
  622. else
  623. head = &item;
  624. item.next = NULL;
  625. tail = &item;
  626. item.list = this;
  627. } // end zmq::norm_engine_t::NormRxStreamState::List::Append()
  628. void zmq::norm_engine_t::NormRxStreamState::List::Remove (
  629. NormRxStreamState &item)
  630. {
  631. if (NULL != item.prev)
  632. item.prev->next = item.next;
  633. else
  634. head = item.next;
  635. if (NULL != item.next)
  636. item.next->prev = item.prev;
  637. else
  638. tail = item.prev;
  639. item.prev = item.next = NULL;
  640. item.list = NULL;
  641. } // end zmq::norm_engine_t::NormRxStreamState::List::Remove()
  642. zmq::norm_engine_t::NormRxStreamState::List::Iterator::Iterator (
  643. const List &list) :
  644. next_item (list.head)
  645. {
  646. }
  647. zmq::norm_engine_t::NormRxStreamState *
  648. zmq::norm_engine_t::NormRxStreamState::List::Iterator::GetNextItem ()
  649. {
  650. NormRxStreamState *nextItem = next_item;
  651. if (NULL != nextItem)
  652. next_item = nextItem->next;
  653. return nextItem;
  654. } // end zmq::norm_engine_t::NormRxStreamState::List::Iterator::GetNextItem()
  655. const zmq::endpoint_uri_pair_t &zmq::norm_engine_t::get_endpoint () const
  656. {
  657. return _empty_endpoint;
  658. }
  659. #endif // ZMQ_HAVE_NORM