123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721 |
- #include "precompiled.hpp"
- #include "platform.hpp"
- #if defined ZMQ_HAVE_NORM
- #include "norm_engine.hpp"
- #include "session_base.hpp"
- #include "v2_protocol.hpp"
- zmq::norm_engine_t::norm_engine_t (io_thread_t *parent_,
- const options_t &options_) :
- io_object_t (parent_),
- zmq_session (NULL),
- options (options_),
- norm_instance (NORM_INSTANCE_INVALID),
- norm_session (NORM_SESSION_INVALID),
- is_sender (false),
- is_receiver (false),
- zmq_encoder (0),
- norm_tx_stream (NORM_OBJECT_INVALID),
- tx_first_msg (true),
- tx_more_bit (false),
- zmq_output_ready (false),
- norm_tx_ready (false),
- tx_index (0),
- tx_len (0),
- zmq_input_ready (false)
- {
- int rc = tx_msg.init ();
- errno_assert (0 == rc);
- }
- zmq::norm_engine_t::~norm_engine_t ()
- {
- shutdown (); // in case it was not already called
- }
- int zmq::norm_engine_t::init (const char *network_, bool send, bool recv)
- {
- // Parse the "network_" address int "iface", "addr", and "port"
- // norm endpoint format: [id,][<iface>;]<addr>:<port>
- // First, look for optional local NormNodeId
- // (default NORM_NODE_ANY causes NORM to use host IP addr for NormNodeId)
- NormNodeId localId = NORM_NODE_ANY;
- const char *ifacePtr = strchr (network_, ',');
- if (NULL != ifacePtr) {
- size_t idLen = ifacePtr - network_;
- if (idLen > 31)
- idLen = 31;
- char idText[32];
- strncpy (idText, network_, idLen);
- idText[idLen] = '\0';
- localId = (NormNodeId) atoi (idText);
- ifacePtr++;
- } else {
- ifacePtr = network_;
- }
- // Second, look for optional multicast ifaceName
- char ifaceName[256];
- const char *addrPtr = strchr (ifacePtr, ';');
- if (NULL != addrPtr) {
- size_t ifaceLen = addrPtr - ifacePtr;
- if (ifaceLen > 255)
- ifaceLen = 255; // return error instead?
- strncpy (ifaceName, ifacePtr, ifaceLen);
- ifaceName[ifaceLen] = '\0';
- ifacePtr = ifaceName;
- addrPtr++;
- } else {
- addrPtr = ifacePtr;
- ifacePtr = NULL;
- }
- // Finally, parse IP address and port number
- const char *portPtr = strrchr (addrPtr, ':');
- if (NULL == portPtr) {
- errno = EINVAL;
- return -1;
- }
- char addr[256];
- size_t addrLen = portPtr - addrPtr;
- if (addrLen > 255)
- addrLen = 255;
- strncpy (addr, addrPtr, addrLen);
- addr[addrLen] = '\0';
- portPtr++;
- unsigned short portNumber = atoi (portPtr);
- if (NORM_INSTANCE_INVALID == norm_instance) {
- if (NORM_INSTANCE_INVALID == (norm_instance = NormCreateInstance ())) {
- // errno set by whatever caused NormCreateInstance() to fail
- return -1;
- }
- }
- // TBD - What do we use for our local NormNodeId?
- // (for now we use automatic, IP addr based assignment or passed in 'id')
- // a) Use ZMQ Identity somehow?
- // b) Add function to use iface addr
- // c) Randomize and implement a NORM session layer
- // conflict detection/resolution protocol
- norm_session = NormCreateSession (norm_instance, addr, portNumber, localId);
- if (NORM_SESSION_INVALID == norm_session) {
- int savedErrno = errno;
- NormDestroyInstance (norm_instance);
- norm_instance = NORM_INSTANCE_INVALID;
- errno = savedErrno;
- return -1;
- }
- // There's many other useful NORM options that could be applied here
- if (NormIsUnicastAddress (addr)) {
- NormSetDefaultUnicastNack (norm_session, true);
- } else {
- // These only apply for multicast sessions
- //NormSetTTL(norm_session, options.multicast_hops); // ZMQ default is 1
- NormSetTTL (
- norm_session,
- 255); // since the ZMQ_MULTICAST_HOPS socket option isn't well-supported
- NormSetRxPortReuse (
- norm_session,
- true); // port reuse doesn't work for non-connected unicast
- NormSetLoopback (norm_session,
- true); // needed when multicast users on same machine
- if (NULL != ifacePtr) {
- // Note a bad interface may not be caught until sender or receiver start
- // (Since sender/receiver is not yet started, this always succeeds here)
- NormSetMulticastInterface (norm_session, ifacePtr);
- }
- }
- if (recv) {
- // The alternative NORM_SYNC_CURRENT here would provide "instant"
- // receiver sync to the sender's _current_ message transmission.
- // NORM_SYNC_STREAM tries to get everything the sender has cached/buffered
- NormSetDefaultSyncPolicy (norm_session, NORM_SYNC_STREAM);
- if (!NormStartReceiver (norm_session, 2 * 1024 * 1024)) {
- // errno set by whatever failed
- int savedErrno = errno;
- NormDestroyInstance (norm_instance); // session gets closed, too
- norm_session = NORM_SESSION_INVALID;
- norm_instance = NORM_INSTANCE_INVALID;
- errno = savedErrno;
- return -1;
- }
- is_receiver = true;
- }
- if (send) {
- // Pick a random sender instance id (aka norm sender session id)
- NormSessionId instanceId = NormGetRandomSessionId ();
- // TBD - provide "options" for some NORM sender parameters
- if (!NormStartSender (norm_session, instanceId, 2 * 1024 * 1024, 1400,
- 16, 4)) {
- // errno set by whatever failed
- int savedErrno = errno;
- NormDestroyInstance (norm_instance); // session gets closed, too
- norm_session = NORM_SESSION_INVALID;
- norm_instance = NORM_INSTANCE_INVALID;
- errno = savedErrno;
- return -1;
- }
- NormSetCongestionControl (norm_session, true);
- norm_tx_ready = true;
- is_sender = true;
- if (NORM_OBJECT_INVALID
- == (norm_tx_stream =
- NormStreamOpen (norm_session, 2 * 1024 * 1024))) {
- // errno set by whatever failed
- int savedErrno = errno;
- NormDestroyInstance (norm_instance); // session gets closed, too
- norm_session = NORM_SESSION_INVALID;
- norm_instance = NORM_INSTANCE_INVALID;
- errno = savedErrno;
- return -1;
- }
- }
- //NormSetMessageTrace(norm_session, true);
- //NormSetDebugLevel(3);
- //NormOpenDebugLog(norm_instance, "normLog.txt");
- return 0; // no error
- } // end zmq::norm_engine_t::init()
- void zmq::norm_engine_t::shutdown ()
- {
- // TBD - implement a more graceful shutdown option
- if (is_receiver) {
- NormStopReceiver (norm_session);
- // delete any active NormRxStreamState
- rx_pending_list.Destroy ();
- rx_ready_list.Destroy ();
- msg_ready_list.Destroy ();
- is_receiver = false;
- }
- if (is_sender) {
- NormStopSender (norm_session);
- is_sender = false;
- }
- if (NORM_SESSION_INVALID != norm_session) {
- NormDestroySession (norm_session);
- norm_session = NORM_SESSION_INVALID;
- }
- if (NORM_INSTANCE_INVALID != norm_instance) {
- NormStopInstance (norm_instance);
- NormDestroyInstance (norm_instance);
- norm_instance = NORM_INSTANCE_INVALID;
- }
- } // end zmq::norm_engine_t::shutdown()
- void zmq::norm_engine_t::plug (io_thread_t *io_thread_,
- session_base_t *session_)
- {
- // TBD - we may assign the NORM engine to an io_thread in the future???
- zmq_session = session_;
- if (is_sender)
- zmq_output_ready = true;
- if (is_receiver)
- zmq_input_ready = true;
- fd_t normDescriptor = NormGetDescriptor (norm_instance);
- norm_descriptor_handle = add_fd (normDescriptor);
- // Set POLLIN for notification of pending NormEvents
- set_pollin (norm_descriptor_handle);
- if (is_sender)
- send_data ();
- } // end zmq::norm_engine_t::init()
- void zmq::norm_engine_t::unplug ()
- {
- rm_fd (norm_descriptor_handle);
- zmq_session = NULL;
- } // end zmq::norm_engine_t::unplug()
- void zmq::norm_engine_t::terminate ()
- {
- unplug ();
- shutdown ();
- delete this;
- }
- void zmq::norm_engine_t::restart_output ()
- {
- // There's new message data available from the session
- zmq_output_ready = true;
- if (norm_tx_ready)
- send_data ();
- } // end zmq::norm_engine_t::restart_output()
- void zmq::norm_engine_t::send_data ()
- {
- // Here we write as much as is available or we can
- while (zmq_output_ready && norm_tx_ready) {
- if (0 == tx_len) {
- // Our tx_buffer needs data to send
- // Get more data from encoder
- size_t space = BUFFER_SIZE;
- unsigned char *bufPtr = (unsigned char *) tx_buffer;
- tx_len = zmq_encoder.encode (&bufPtr, space);
- if (0 == tx_len) {
- if (tx_first_msg) {
- // We don't need to mark eom/flush until a message is sent
- tx_first_msg = false;
- } else {
- // A prior message was completely written to stream, so
- // mark end-of-message and possibly flush (to force packet transmission,
- // even if it's not a full segment so message gets delivered quickly)
- // NormStreamMarkEom(norm_tx_stream); // the flush below marks eom
- // Note NORM_FLUSH_ACTIVE makes NORM fairly chatty for low duty cycle messaging
- // but makes sure content is delivered quickly. Positive acknowledgements
- // with flush override would make NORM more succinct here
- NormStreamFlush (norm_tx_stream, true, NORM_FLUSH_ACTIVE);
- }
- // Need to pull and load a new message to send
- if (-1 == zmq_session->pull_msg (&tx_msg)) {
- // We need to wait for "restart_output()" to be called by ZMQ
- zmq_output_ready = false;
- break;
- }
- zmq_encoder.load_msg (&tx_msg);
- // Should we write message size header for NORM to use? Or expect NORM
- // receiver to decode ZMQ message framing format(s)?
- // OK - we need to use a byte to denote when the ZMQ frame is the _first_
- // frame of a message so it can be decoded properly when a receiver
- // 'syncs' mid-stream. We key off the the state of the 'more_flag'
- // I.e.,If more_flag _was_ false previously, this is the first
- // frame of a ZMQ message.
- if (tx_more_bit)
- tx_buffer[0] =
- (char) 0xff; // this is not first frame of message
- else
- tx_buffer[0] = 0x00; // this is first frame of message
- tx_more_bit = (0 != (tx_msg.flags () & msg_t::more));
- // Go ahead an get a first chunk of the message
- bufPtr++;
- space--;
- tx_len = 1 + zmq_encoder.encode (&bufPtr, space);
- tx_index = 0;
- }
- }
- // Do we have data in our tx_buffer pending
- if (tx_index < tx_len) {
- // We have data in our tx_buffer to send, so write it to the stream
- tx_index += NormStreamWrite (norm_tx_stream, tx_buffer + tx_index,
- tx_len - tx_index);
- if (tx_index < tx_len) {
- // NORM stream buffer full, wait for NORM_TX_QUEUE_VACANCY
- norm_tx_ready = false;
- break;
- }
- tx_len = 0; // all buffered data was written
- }
- } // end while (zmq_output_ready && norm_tx_ready)
- } // end zmq::norm_engine_t::send_data()
- void zmq::norm_engine_t::in_event ()
- {
- // This means a NormEvent is pending, so call NormGetNextEvent() and handle
- NormEvent event;
- if (!NormGetNextEvent (norm_instance, &event)) {
- // NORM has died before we unplugged?!
- zmq_assert (false);
- return;
- }
- switch (event.type) {
- case NORM_TX_QUEUE_VACANCY:
- case NORM_TX_QUEUE_EMPTY:
- if (!norm_tx_ready) {
- norm_tx_ready = true;
- send_data ();
- }
- break;
- case NORM_RX_OBJECT_NEW:
- //break;
- case NORM_RX_OBJECT_UPDATED:
- recv_data (event.object);
- break;
- case NORM_RX_OBJECT_ABORTED: {
- NormRxStreamState *rxState =
- (NormRxStreamState *) NormObjectGetUserData (event.object);
- if (NULL != rxState) {
- // Remove the state from the list it's in
- // This is now unnecessary since deletion takes care of list removal
- // but in the interest of being clear ...
- NormRxStreamState::List *list = rxState->AccessList ();
- if (NULL != list)
- list->Remove (*rxState);
- }
- delete rxState;
- break;
- }
- case NORM_REMOTE_SENDER_INACTIVE:
- // Here we free resources used for this formerly active sender.
- // Note w/ NORM_SYNC_STREAM, if sender reactivates, we may
- // get some messages delivered twice. NORM_SYNC_CURRENT would
- // mitigate that but might miss data at startup. Always tradeoffs.
- // Instead of immediately deleting, we could instead initiate a
- // user configurable timeout here to wait some amount of time
- // after this event to declare the remote sender truly dead
- // and delete its state???
- NormNodeDelete (event.sender);
- break;
- default:
- // We ignore some NORM events
- break;
- }
- } // zmq::norm_engine_t::in_event()
- bool zmq::norm_engine_t::restart_input ()
- {
- // TBD - should we check/assert that zmq_input_ready was false???
- zmq_input_ready = true;
- // Process any pending received messages
- if (!msg_ready_list.IsEmpty ())
- recv_data (NORM_OBJECT_INVALID);
- return true;
- } // end zmq::norm_engine_t::restart_input()
- void zmq::norm_engine_t::recv_data (NormObjectHandle object)
- {
- if (NORM_OBJECT_INVALID != object) {
- // Call result of NORM_RX_OBJECT_UPDATED notification
- // This is a rx_ready indication for a new or existing rx stream
- // First, determine if this is a stream we already know
- zmq_assert (NORM_OBJECT_STREAM == NormObjectGetType (object));
- // Since there can be multiple senders (publishers), we keep
- // state for each separate rx stream.
- NormRxStreamState *rxState =
- (NormRxStreamState *) NormObjectGetUserData (object);
- if (NULL == rxState) {
- // This is a new stream, so create rxState with zmq decoder, etc
- rxState = new (std::nothrow)
- NormRxStreamState (object, options.maxmsgsize, options.zero_copy,
- options.in_batch_size);
- errno_assert (rxState);
- if (!rxState->Init ()) {
- errno_assert (false);
- delete rxState;
- return;
- }
- NormObjectSetUserData (object, rxState);
- } else if (!rxState->IsRxReady ()) {
- // Existing non-ready stream, so remove from pending
- // list to be promoted to rx_ready_list ...
- rx_pending_list.Remove (*rxState);
- }
- if (!rxState->IsRxReady ()) {
- // TBD - prepend up front for immediate service?
- rxState->SetRxReady (true);
- rx_ready_list.Append (*rxState);
- }
- }
- // This loop repeats until we've read all data available from "rx ready" inbound streams
- // and pushed any accumulated messages we can up to the zmq session.
- while (!rx_ready_list.IsEmpty ()
- || (zmq_input_ready && !msg_ready_list.IsEmpty ())) {
- // Iterate through our rx_ready streams, reading data into the decoder
- // (This services incoming "rx ready" streams in a round-robin fashion)
- NormRxStreamState::List::Iterator iterator (rx_ready_list);
- NormRxStreamState *rxState;
- while (NULL != (rxState = iterator.GetNextItem ())) {
- switch (rxState->Decode ()) {
- case 1: // msg completed
- // Complete message decoded, move this stream to msg_ready_list
- // to push the message up to the session below. Note the stream
- // will be returned to the "rx_ready_list" after that's done
- rx_ready_list.Remove (*rxState);
- msg_ready_list.Append (*rxState);
- continue;
- case -1: // decoding error (shouldn't happen w/ NORM, but ...)
- // We need to re-sync this stream (decoder buffer was reset)
- rxState->SetSync (false);
- break;
- default: // 0 - need more data
- break;
- }
- // Get more data from this stream
- NormObjectHandle stream = rxState->GetStreamHandle ();
- // First, make sure we're in sync ...
- while (!rxState->InSync ()) {
- // seek NORM message start
- if (!NormStreamSeekMsgStart (stream)) {
- // Need to wait for more data
- break;
- }
- // read message 'flag' byte to see if this it's a 'final' frame
- char syncFlag;
- unsigned int numBytes = 1;
- if (!NormStreamRead (stream, &syncFlag, &numBytes)) {
- // broken stream (shouldn't happen after seek msg start?)
- zmq_assert (false);
- continue;
- }
- if (0 == numBytes) {
- // This probably shouldn't happen either since we found msg start
- // Need to wait for more data
- break;
- }
- if (0 == syncFlag)
- rxState->SetSync (true);
- // else keep seeking ...
- } // end while(!rxState->InSync())
- if (!rxState->InSync ()) {
- // Need more data for this stream, so remove from "rx ready"
- // list and iterate to next "rx ready" stream
- rxState->SetRxReady (false);
- // Move from rx_ready_list to rx_pending_list
- rx_ready_list.Remove (*rxState);
- rx_pending_list.Append (*rxState);
- continue;
- }
- // Now we're actually ready to read data from the NORM stream to the zmq_decoder
- // the underlying zmq_decoder->get_buffer() call sets how much is needed.
- unsigned int numBytes = rxState->GetBytesNeeded ();
- if (!NormStreamRead (stream, rxState->AccessBuffer (), &numBytes)) {
- // broken NORM stream, so re-sync
- rxState->Init (); // TBD - check result
- // This will retry syncing, and getting data from this stream
- // since we don't increment the "it" iterator
- continue;
- }
- rxState->IncrementBufferCount (numBytes);
- if (0 == numBytes) {
- // All the data available has been read
- // Need to wait for NORM_RX_OBJECT_UPDATED for this stream
- rxState->SetRxReady (false);
- // Move from rx_ready_list to rx_pending_list
- rx_ready_list.Remove (*rxState);
- rx_pending_list.Append (*rxState);
- }
- } // end while(NULL != (rxState = iterator.GetNextItem()))
- if (zmq_input_ready) {
- // At this point, we've made a pass through the "rx_ready" stream list
- // Now make a pass through the "msg_pending" list (if the zmq session
- // ready for more input). This may possibly return streams back to
- // the "rx ready" stream list after their pending message is handled
- NormRxStreamState::List::Iterator iterator (msg_ready_list);
- NormRxStreamState *rxState;
- while (NULL != (rxState = iterator.GetNextItem ())) {
- msg_t *msg = rxState->AccessMsg ();
- int rc = zmq_session->push_msg (msg);
- if (-1 == rc) {
- if (EAGAIN == errno) {
- // need to wait until session calls "restart_input()"
- zmq_input_ready = false;
- break;
- } else {
- // session rejected message?
- // TBD - handle this better
- zmq_assert (false);
- }
- }
- // else message was accepted.
- msg_ready_list.Remove (*rxState);
- if (
- rxState
- ->IsRxReady ()) // Move back to "rx_ready" list to read more data
- rx_ready_list.Append (*rxState);
- else // Move back to "rx_pending" list until NORM_RX_OBJECT_UPDATED
- msg_ready_list.Append (*rxState);
- } // end while(NULL != (rxState = iterator.GetNextItem()))
- } // end if (zmq_input_ready)
- } // end while ((!rx_ready_list.empty() || (zmq_input_ready && !msg_ready_list.empty()))
- // Alert zmq of the messages we have pushed up
- zmq_session->flush ();
- } // end zmq::norm_engine_t::recv_data()
- zmq::norm_engine_t::NormRxStreamState::NormRxStreamState (
- NormObjectHandle normStream,
- int64_t maxMsgSize,
- bool zeroCopy,
- int inBatchSize) :
- norm_stream (normStream),
- max_msg_size (maxMsgSize),
- zero_copy (zeroCopy),
- in_batch_size (inBatchSize),
- in_sync (false),
- rx_ready (false),
- zmq_decoder (NULL),
- skip_norm_sync (false),
- buffer_ptr (NULL),
- buffer_size (0),
- buffer_count (0),
- prev (NULL),
- next (NULL),
- list (NULL)
- {
- }
- zmq::norm_engine_t::NormRxStreamState::~NormRxStreamState ()
- {
- if (NULL != zmq_decoder) {
- delete zmq_decoder;
- zmq_decoder = NULL;
- }
- if (NULL != list) {
- list->Remove (*this);
- list = NULL;
- }
- }
- bool zmq::norm_engine_t::NormRxStreamState::Init ()
- {
- in_sync = false;
- skip_norm_sync = false;
- if (NULL != zmq_decoder)
- delete zmq_decoder;
- zmq_decoder =
- new (std::nothrow) v2_decoder_t (in_batch_size, max_msg_size, zero_copy);
- alloc_assert (zmq_decoder);
- if (NULL != zmq_decoder) {
- buffer_count = 0;
- buffer_size = 0;
- zmq_decoder->get_buffer (&buffer_ptr, &buffer_size);
- return true;
- } else {
- return false;
- }
- } // end zmq::norm_engine_t::NormRxStreamState::Init()
- // This decodes any pending data sitting in our stream decoder buffer
- // It returns 1 upon message completion, -1 on error, 1 on msg completion
- int zmq::norm_engine_t::NormRxStreamState::Decode ()
- {
- // If we have pending bytes to decode, process those first
- while (buffer_count > 0) {
- // There's pending data for the decoder to decode
- size_t processed = 0;
- // This a bit of a kludgy approach used to weed
- // out the NORM ZMQ message transport "syncFlag" byte
- // from the ZMQ message stream being decoded (but it works!)
- if (skip_norm_sync) {
- buffer_ptr++;
- buffer_count--;
- skip_norm_sync = false;
- }
- int rc = zmq_decoder->decode (buffer_ptr, buffer_count, processed);
- buffer_ptr += processed;
- buffer_count -= processed;
- switch (rc) {
- case 1:
- // msg completed
- if (0 == buffer_count) {
- buffer_size = 0;
- zmq_decoder->get_buffer (&buffer_ptr, &buffer_size);
- }
- skip_norm_sync = true;
- return 1;
- case -1:
- // decoder error (reset decoder and state variables)
- in_sync = false;
- skip_norm_sync = false; // will get consumed by norm sync check
- Init ();
- break;
- case 0:
- // need more data, keep decoding until buffer exhausted
- break;
- }
- }
- // Reset buffer pointer/count for next read
- buffer_count = 0;
- buffer_size = 0;
- zmq_decoder->get_buffer (&buffer_ptr, &buffer_size);
- return 0; // need more data
- } // end zmq::norm_engine_t::NormRxStreamState::Decode()
- zmq::norm_engine_t::NormRxStreamState::List::List () : head (NULL), tail (NULL)
- {
- }
- zmq::norm_engine_t::NormRxStreamState::List::~List ()
- {
- Destroy ();
- }
- void zmq::norm_engine_t::NormRxStreamState::List::Destroy ()
- {
- NormRxStreamState *item = head;
- while (NULL != item) {
- Remove (*item);
- delete item;
- item = head;
- }
- } // end zmq::norm_engine_t::NormRxStreamState::List::Destroy()
- void zmq::norm_engine_t::NormRxStreamState::List::Append (
- NormRxStreamState &item)
- {
- item.prev = tail;
- if (NULL != tail)
- tail->next = &item;
- else
- head = &item;
- item.next = NULL;
- tail = &item;
- item.list = this;
- } // end zmq::norm_engine_t::NormRxStreamState::List::Append()
- void zmq::norm_engine_t::NormRxStreamState::List::Remove (
- NormRxStreamState &item)
- {
- if (NULL != item.prev)
- item.prev->next = item.next;
- else
- head = item.next;
- if (NULL != item.next)
- item.next->prev = item.prev;
- else
- tail = item.prev;
- item.prev = item.next = NULL;
- item.list = NULL;
- } // end zmq::norm_engine_t::NormRxStreamState::List::Remove()
- zmq::norm_engine_t::NormRxStreamState::List::Iterator::Iterator (
- const List &list) :
- next_item (list.head)
- {
- }
- zmq::norm_engine_t::NormRxStreamState *
- zmq::norm_engine_t::NormRxStreamState::List::Iterator::GetNextItem ()
- {
- NormRxStreamState *nextItem = next_item;
- if (NULL != nextItem)
- next_item = nextItem->next;
- return nextItem;
- } // end zmq::norm_engine_t::NormRxStreamState::List::Iterator::GetNextItem()
- const zmq::endpoint_uri_pair_t &zmq::norm_engine_t::get_endpoint () const
- {
- return _empty_endpoint;
- }
- #endif // ZMQ_HAVE_NORM
|