norm_engine.hpp 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. #ifndef __ZMQ_NORM_ENGINE_HPP_INCLUDED__
  2. #define __ZMQ_NORM_ENGINE_HPP_INCLUDED__
  3. #if defined ZMQ_HAVE_NORM
  4. #include "io_object.hpp"
  5. #include "i_engine.hpp"
  6. #include "options.hpp"
  7. #include "v2_decoder.hpp"
  8. #include "v2_encoder.hpp"
  9. #include <normApi.h>
  10. namespace zmq
  11. {
  12. class io_thread_t;
  13. class msg_t;
  14. class session_base_t;
  15. class norm_engine_t ZMQ_FINAL : public io_object_t, public i_engine
  16. {
  17. public:
  18. norm_engine_t (zmq::io_thread_t *parent_, const options_t &options_);
  19. ~norm_engine_t () ZMQ_FINAL;
  20. // create NORM instance, session, etc
  21. int init (const char *network_, bool send, bool recv);
  22. void shutdown ();
  23. bool has_handshake_stage () ZMQ_FINAL { return false; };
  24. // i_engine interface implementation.
  25. // Plug the engine to the session.
  26. void plug (zmq::io_thread_t *io_thread_,
  27. class session_base_t *session_) ZMQ_FINAL;
  28. // Terminate and deallocate the engine. Note that 'detached'
  29. // events are not fired on termination.
  30. void terminate () ZMQ_FINAL;
  31. // This method is called by the session to signalise that more
  32. // messages can be written to the pipe.
  33. bool restart_input () ZMQ_FINAL;
  34. // This method is called by the session to signalise that there
  35. // are messages to send available.
  36. void restart_output () ZMQ_FINAL;
  37. void zap_msg_available () ZMQ_FINAL {}
  38. const endpoint_uri_pair_t &get_endpoint () const ZMQ_FINAL;
  39. // i_poll_events interface implementation.
  40. // (we only need in_event() for NormEvent notification)
  41. // (i.e., don't have any output events or timers (yet))
  42. void in_event ();
  43. private:
  44. void unplug ();
  45. void send_data ();
  46. void recv_data (NormObjectHandle stream);
  47. enum
  48. {
  49. BUFFER_SIZE = 2048
  50. };
  51. // Used to keep track of streams from multiple senders
  52. class NormRxStreamState
  53. {
  54. public:
  55. NormRxStreamState (NormObjectHandle normStream,
  56. int64_t maxMsgSize,
  57. bool zeroCopy,
  58. int inBatchSize);
  59. ~NormRxStreamState ();
  60. NormObjectHandle GetStreamHandle () const { return norm_stream; }
  61. bool Init ();
  62. void SetRxReady (bool state) { rx_ready = state; }
  63. bool IsRxReady () const { return rx_ready; }
  64. void SetSync (bool state) { in_sync = state; }
  65. bool InSync () const { return in_sync; }
  66. // These are used to feed data to decoder
  67. // and its underlying "msg" buffer
  68. char *AccessBuffer () { return (char *) (buffer_ptr + buffer_count); }
  69. size_t GetBytesNeeded () const { return buffer_size - buffer_count; }
  70. void IncrementBufferCount (size_t count) { buffer_count += count; }
  71. msg_t *AccessMsg () { return zmq_decoder->msg (); }
  72. // This invokes the decoder "decode" method
  73. // returning 0 if more data is needed,
  74. // 1 if the message is complete, If an error
  75. // occurs the 'sync' is dropped and the
  76. // decoder re-initialized
  77. int Decode ();
  78. class List
  79. {
  80. public:
  81. List ();
  82. ~List ();
  83. void Append (NormRxStreamState &item);
  84. void Remove (NormRxStreamState &item);
  85. bool IsEmpty () const { return NULL == head; }
  86. void Destroy ();
  87. class Iterator
  88. {
  89. public:
  90. Iterator (const List &list);
  91. NormRxStreamState *GetNextItem ();
  92. private:
  93. NormRxStreamState *next_item;
  94. };
  95. friend class Iterator;
  96. private:
  97. NormRxStreamState *head;
  98. NormRxStreamState *tail;
  99. }; // end class zmq::norm_engine_t::NormRxStreamState::List
  100. friend class List;
  101. List *AccessList () { return list; }
  102. private:
  103. NormObjectHandle norm_stream;
  104. int64_t max_msg_size;
  105. bool zero_copy;
  106. int in_batch_size;
  107. bool in_sync;
  108. bool rx_ready;
  109. v2_decoder_t *zmq_decoder;
  110. bool skip_norm_sync;
  111. unsigned char *buffer_ptr;
  112. size_t buffer_size;
  113. size_t buffer_count;
  114. NormRxStreamState *prev;
  115. NormRxStreamState *next;
  116. NormRxStreamState::List *list;
  117. }; // end class zmq::norm_engine_t::NormRxStreamState
  118. const endpoint_uri_pair_t _empty_endpoint;
  119. session_base_t *zmq_session;
  120. options_t options;
  121. NormInstanceHandle norm_instance;
  122. handle_t norm_descriptor_handle;
  123. NormSessionHandle norm_session;
  124. bool is_sender;
  125. bool is_receiver;
  126. // Sender state
  127. msg_t tx_msg;
  128. v2_encoder_t zmq_encoder; // for tx messages (we use v2 for now)
  129. NormObjectHandle norm_tx_stream;
  130. bool tx_first_msg;
  131. bool tx_more_bit;
  132. bool zmq_output_ready; // zmq has msg(s) to send
  133. bool norm_tx_ready; // norm has tx queue vacancy
  134. // TBD - maybe don't need buffer if can access zmq message buffer directly?
  135. char tx_buffer[BUFFER_SIZE];
  136. unsigned int tx_index;
  137. unsigned int tx_len;
  138. // Receiver state
  139. // Lists of norm rx streams from remote senders
  140. bool zmq_input_ready; // zmq ready to receive msg(s)
  141. NormRxStreamState::List
  142. rx_pending_list; // rx streams waiting for data reception
  143. NormRxStreamState::List
  144. rx_ready_list; // rx streams ready for NormStreamRead()
  145. NormRxStreamState::List
  146. msg_ready_list; // rx streams w/ msg ready for push to zmq
  147. }; // end class norm_engine_t
  148. }
  149. #endif // ZMQ_HAVE_NORM
  150. #endif // !__ZMQ_NORM_ENGINE_HPP_INCLUDED__