ModuleHTTP.cpp 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. #include <httplib.h>
  2. #include <iostream>
  3. #include <thread>
  4. #include <Generator.h>
  5. #include <boost/process.hpp>
  6. #include <filesystem>
  7. //#include <wait.h
  8. #include "ModuleBase.h"
  9. #include <httplib.h>
  10. #include <memory>
  11. #include <chrono>
  12. #include "IOptimizable.h"
  13. using namespace httplib;
  14. namespace mdd{
  15. using namespace boost::process;
  16. class ModuleHTTP : public ModuleBase {
  17. private:
  18. std::string _fname;
  19. std::string _id;
  20. int _port;
  21. std::unique_ptr<child> _child;
  22. protected:
  23. bool connect();
  24. std::string str_to_json(const std::string& input);
  25. template<class T>
  26. state updateVectorOf(std::vector<std::shared_ptr<T>>& vec, const std::string& msg);;
  27. state updateInputs();
  28. state updateOutputs();
  29. public:
  30. ModuleHTTP();// std::string fname, std::string id, int port);
  31. ~ModuleHTTP();
  32. state update() override;
  33. bool configure(const std::string& config) override;
  34. };
  35. bool ModuleHTTP::connect(){
  36. Client cli(_id, _port);
  37. std::string body;
  38. auto res = cli.Get("/status",
  39. [&](const char *data, size_t data_length) {
  40. body.append(data, data_length);
  41. return true;
  42. });
  43. if(body.empty()){
  44. return false;
  45. }
  46. body = std::string(R"()") + body;
  47. json status = json::parse(body.c_str());
  48. if (status["status"].get<std::string>() == "ready"){
  49. return true;
  50. }
  51. else{
  52. return false;
  53. }
  54. }
  55. std::string ModuleHTTP::str_to_json(const std::string& input){
  56. if(input.empty()){
  57. return input;
  58. }
  59. std::string str = input;
  60. size_t start_pos = 0;
  61. while((start_pos = str.find('[', start_pos)) != std::string::npos) {
  62. str.replace(start_pos, 1, "{");
  63. start_pos += 1;
  64. }
  65. start_pos = 0;
  66. while((start_pos = str.find(']', start_pos)) != std::string::npos) {
  67. str.replace(start_pos, 1, "}");
  68. start_pos += 1;
  69. }
  70. return str;
  71. }
  72. template<class T>
  73. state ModuleHTTP::updateVectorOf(std::vector<std::shared_ptr<T>>& vec, const std::string& msg) {
  74. json server = json::parse(msg.c_str());
  75. int diff = (int)server.size() - (int)vec.size();
  76. if (diff < 0)
  77. {
  78. vec.erase(vec.end() + diff, vec.end());
  79. }
  80. for(size_t i=0; i < vec.size(); i++){
  81. vec[i]->setType(server[i]["type"].get<std::string>());
  82. vec[i]->setAppendix(i);
  83. vec[i]->setValue() = server[i]["value"].get<std::vector<double>>();
  84. }
  85. for (size_t i = 0; i < diff; i++)
  86. {
  87. size_t length = vec.size();
  88. if (server[length]["value"].is_array())
  89. {
  90. vec.emplace_back(server[length]["type"].get<std::string>(), length, server_inputs[length]["value"].get<std::vector<double>>());
  91. }
  92. else {
  93. vec.emplace_back(server[length]["type"].get<std::string>(), length, std::vector<double>{server_inputs[length]["value"].get<double>()});
  94. std::cout << "Warning: Server expects single values, but mdd work with arrays!" << std::endl;
  95. }
  96. }
  97. return state::UNCHANGED;
  98. }
  99. state ModuleHTTP::updateInputs() {
  100. Client cli(_id, _port);
  101. std::string body;
  102. auto res = cli.Get("/inputs",
  103. [&](const char* data, size_t data_length) {
  104. body.append(data, data_length);
  105. return true;
  106. });
  107. assert(res->body.empty());
  108. body = std::string(R"()") + body;
  109. updateVectorOf(inputs, body);
  110. }
  111. state ModuleHTTP::updateOutputs() {
  112. Client cli(_id, _port);
  113. std::string body;
  114. auto res = cli.Get("/outputs",
  115. [&](const char* data, size_t data_length) {
  116. body.append(data, data_length);
  117. return true;
  118. });
  119. assert(res->body.empty());
  120. body = std::string(R"()") + body;
  121. updateVectorOf(outputs, body);
  122. }
  123. ModuleHTTP::ModuleHTTP()// std::string fname, std::string id, int port):
  124. : ModuleBase(R"JSON(
  125. [{
  126. "name":"url",
  127. "value":""
  128. },{
  129. "name":"port",
  130. "value":256
  131. },{
  132. "name":"file",
  133. "value":""
  134. }])JSON")
  135. {
  136. setType("HTTP");
  137. }
  138. bool ModuleHTTP::configure(const std::string& config) {
  139. json config_parsed = json::parse(config);
  140. bool found = false;
  141. for (size_t i = 0; i < config_parsed.size(); i++)
  142. {
  143. if (config_parsed[i].contains("name"))
  144. {
  145. if (config_parsed[i]["name"].get<std::string>() == "url")
  146. {
  147. _id = config_parsed[i]["value"].get<std::string>();
  148. found = true;
  149. }
  150. else if (config_parsed[i]["name"].get<std::string>() == "port")
  151. {
  152. _port = config_parsed[i]["value"].get<int>();
  153. found = true;
  154. }
  155. else if (config_parsed[i]["name"].get<std::string>() == "file")
  156. {
  157. _fname = config_parsed[i]["value"].get<std::string>();
  158. found = true;
  159. }
  160. else {
  161. std::cout << "ERROR Configure option: " << config_parsed[i]["name"].dump() << " do not exist!" << std::endl;
  162. return false;
  163. }
  164. }
  165. else {
  166. std::cout << "ERROR Wrong configure format: " << config_parsed[i].dump() << std::endl;
  167. return false;
  168. }
  169. }
  170. if (!_fname.empty()) {
  171. if (std::filesystem::exists(_fname))
  172. {
  173. _child = std::make_unique<child>("python3 " + _fname + " " + std::to_string(_port));
  174. }
  175. else {
  176. std::cout << "ERROR Couldnt find: " << _fname << std::endl;
  177. return false;
  178. }
  179. }
  180. while (!connect()) {
  181. std::this_thread::sleep_for(std::chrono::microseconds(500));
  182. }
  183. updateInputs();
  184. updateOutputs();
  185. return found;
  186. }
  187. ModuleHTTP::~ModuleHTTP()
  188. {
  189. if (_child != nullptr)
  190. {
  191. _child->terminate();
  192. _child->join();
  193. }
  194. }
  195. state ModuleHTTP::update() {
  196. Client cli(_id, _port);
  197. json new_inputs;
  198. for (int i = 0; i < inputs.size(); ++i) {
  199. json input;
  200. input["value"] = getInput(i)->getValue();
  201. //std::cout << "ModuleHTTP::update: last: " << getInput(i)->getValue().back() << std::endl;
  202. new_inputs.push_back(input);
  203. }
  204. std::string content = new_inputs.dump();
  205. while (!connect()) {
  206. std::this_thread::sleep_for(std::chrono::microseconds(500));
  207. }
  208. //std::cout << "ModuleHTTP::update: " << content << std::endl;
  209. auto res = cli.Post("/update",content.size(),
  210. [&](size_t offset, size_t length, DataSink &sink) {
  211. sink.write(content.data() + offset, length);
  212. return true; // return 'false' if you want to cancel the request.
  213. },"application/json");
  214. while(!connect()){
  215. std::this_thread::sleep_for(std::chrono::microseconds(500));
  216. }
  217. return updateOutputs();
  218. }
  219. }