encoder.hpp 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. /*
  2. Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
  3. This file is part of libzmq, the ZeroMQ core engine in C++.
  4. libzmq is free software; you can redistribute it and/or modify it under
  5. the terms of the GNU Lesser General Public License (LGPL) as published
  6. by the Free Software Foundation; either version 3 of the License, or
  7. (at your option) any later version.
  8. As a special exception, the Contributors give you permission to link
  9. this library with independent modules to produce an executable,
  10. regardless of the license terms of these independent modules, and to
  11. copy and distribute the resulting executable under terms of your choice,
  12. provided that you also meet, for each linked independent module, the
  13. terms and conditions of the license of that module. An independent
  14. module is a module which is not derived from or based on this library.
  15. If you modify this library, you must extend this exception to your
  16. version of the library.
  17. libzmq is distributed in the hope that it will be useful, but WITHOUT
  18. ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  19. FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
  20. License for more details.
  21. You should have received a copy of the GNU Lesser General Public License
  22. along with this program. If not, see <http://www.gnu.org/licenses/>.
  23. */
  24. #ifndef __ZMQ_ENCODER_HPP_INCLUDED__
  25. #define __ZMQ_ENCODER_HPP_INCLUDED__
  26. #if defined(_MSC_VER)
  27. #ifndef NOMINMAX
  28. #define NOMINMAX
  29. #endif
  30. #endif
  31. #include <stddef.h>
  32. #include <string.h>
  33. #include <stdlib.h>
  34. #include <algorithm>
  35. #include "err.hpp"
  36. #include "i_encoder.hpp"
  37. #include "msg.hpp"
  38. namespace zmq
  39. {
  40. // Helper base class for encoders. It implements the state machine that
  41. // fills the outgoing buffer. Derived classes should implement individual
  42. // state machine actions.
  43. template <typename T> class encoder_base_t : public i_encoder
  44. {
  45. public:
  46. explicit encoder_base_t (size_t bufsize_) :
  47. _write_pos (0),
  48. _to_write (0),
  49. _next (NULL),
  50. _new_msg_flag (false),
  51. _buf_size (bufsize_),
  52. _buf (static_cast<unsigned char *> (malloc (bufsize_))),
  53. _in_progress (NULL)
  54. {
  55. alloc_assert (_buf);
  56. }
  57. ~encoder_base_t () ZMQ_OVERRIDE { free (_buf); }
  58. // The function returns a batch of binary data. The data
  59. // are filled to a supplied buffer. If no buffer is supplied (data_
  60. // points to NULL) decoder object will provide buffer of its own.
  61. size_t encode (unsigned char **data_, size_t size_) ZMQ_FINAL
  62. {
  63. unsigned char *buffer = !*data_ ? _buf : *data_;
  64. const size_t buffersize = !*data_ ? _buf_size : size_;
  65. if (in_progress () == NULL)
  66. return 0;
  67. size_t pos = 0;
  68. while (pos < buffersize) {
  69. // If there are no more data to return, run the state machine.
  70. // If there are still no data, return what we already have
  71. // in the buffer.
  72. if (!_to_write) {
  73. if (_new_msg_flag) {
  74. int rc = _in_progress->close ();
  75. errno_assert (rc == 0);
  76. rc = _in_progress->init ();
  77. errno_assert (rc == 0);
  78. _in_progress = NULL;
  79. break;
  80. }
  81. (static_cast<T *> (this)->*_next) ();
  82. }
  83. // If there are no data in the buffer yet and we are able to
  84. // fill whole buffer in a single go, let's use zero-copy.
  85. // There's no disadvantage to it as we cannot stuck multiple
  86. // messages into the buffer anyway. Note that subsequent
  87. // write(s) are non-blocking, thus each single write writes
  88. // at most SO_SNDBUF bytes at once not depending on how large
  89. // is the chunk returned from here.
  90. // As a consequence, large messages being sent won't block
  91. // other engines running in the same I/O thread for excessive
  92. // amounts of time.
  93. if (!pos && !*data_ && _to_write >= buffersize) {
  94. *data_ = _write_pos;
  95. pos = _to_write;
  96. _write_pos = NULL;
  97. _to_write = 0;
  98. return pos;
  99. }
  100. // Copy data to the buffer. If the buffer is full, return.
  101. const size_t to_copy = std::min (_to_write, buffersize - pos);
  102. memcpy (buffer + pos, _write_pos, to_copy);
  103. pos += to_copy;
  104. _write_pos += to_copy;
  105. _to_write -= to_copy;
  106. }
  107. *data_ = buffer;
  108. return pos;
  109. }
  110. void load_msg (msg_t *msg_) ZMQ_FINAL
  111. {
  112. zmq_assert (in_progress () == NULL);
  113. _in_progress = msg_;
  114. (static_cast<T *> (this)->*_next) ();
  115. }
  116. protected:
  117. // Prototype of state machine action.
  118. typedef void (T::*step_t) ();
  119. // This function should be called from derived class to write the data
  120. // to the buffer and schedule next state machine action.
  121. void next_step (void *write_pos_,
  122. size_t to_write_,
  123. step_t next_,
  124. bool new_msg_flag_)
  125. {
  126. _write_pos = static_cast<unsigned char *> (write_pos_);
  127. _to_write = to_write_;
  128. _next = next_;
  129. _new_msg_flag = new_msg_flag_;
  130. }
  131. msg_t *in_progress () { return _in_progress; }
  132. private:
  133. // Where to get the data to write from.
  134. unsigned char *_write_pos;
  135. // How much data to write before next step should be executed.
  136. size_t _to_write;
  137. // Next step. If set to NULL, it means that associated data stream
  138. // is dead.
  139. step_t _next;
  140. bool _new_msg_flag;
  141. // The buffer for encoded data.
  142. const size_t _buf_size;
  143. unsigned char *const _buf;
  144. msg_t *_in_progress;
  145. ZMQ_NON_COPYABLE_NOR_MOVABLE (encoder_base_t)
  146. };
  147. }
  148. #endif