#include #include #include #include #include #include //#include #include #include #include "IOptimizable.h" using namespace httplib; namespace mdd{ using namespace boost::process; class ModuleHTTP : public ModuleBase { private: std::string _fname; std::string _id; int _port; std::unique_ptr _child; protected: bool connect(); std::string str_to_json(const std::string& input); template state updateVectorOf(std::vector>& vec, const std::string& msg);; state updateInputs(); state updateOutputs(); public: ModuleHTTP();// std::string fname, std::string id, int port); ~ModuleHTTP(); state update() override; bool configure(const std::string& config) override; }; bool ModuleHTTP::connect(){ Client cli(_id, _port); std::string body; auto res = cli.Get("/status", [&](const char *data, size_t data_length) { body.append(data, data_length); return true; }); if(body.empty()){ return false; } body = std::string(R"()") + body; json status = json::parse(body.c_str()); if (status["status"].get() == "ready"){ return true; } else{ return false; } } std::string ModuleHTTP::str_to_json(const std::string& input){ if(input.empty()){ return input; } std::string str = input; size_t start_pos = 0; while((start_pos = str.find('[', start_pos)) != std::string::npos) { str.replace(start_pos, 1, "{"); start_pos += 1; } start_pos = 0; while((start_pos = str.find(']', start_pos)) != std::string::npos) { str.replace(start_pos, 1, "}"); start_pos += 1; } return str; } template state ModuleHTTP::updateVectorOf(std::vector>& vec, const std::string& msg) { json server = json::parse(msg.c_str()); int diff = (int)server.size() - (int)vec.size(); if (diff < 0) { vec.erase(vec.end() + diff, vec.end()); } for(size_t i=0; i < vec.size(); i++){ vec[i]->setType(server[i]["type"].get()); vec[i]->setAppendix(i); vec[i]->setValue() = server[i]["value"].get>(); } for (size_t i = 0; i < diff; i++) { size_t length = vec.size(); if (server[length]["value"].is_array()) { vec.emplace_back(server[length]["type"].get(), length, server_inputs[length]["value"].get>()); } else { vec.emplace_back(server[length]["type"].get(), length, std::vector{server_inputs[length]["value"].get()}); std::cout << "Warning: Server expects single values, but mdd work with arrays!" << std::endl; } } return state::UNCHANGED; } state ModuleHTTP::updateInputs() { Client cli(_id, _port); std::string body; auto res = cli.Get("/inputs", [&](const char* data, size_t data_length) { body.append(data, data_length); return true; }); assert(res->body.empty()); body = std::string(R"()") + body; updateVectorOf(inputs, body); } state ModuleHTTP::updateOutputs() { Client cli(_id, _port); std::string body; auto res = cli.Get("/outputs", [&](const char* data, size_t data_length) { body.append(data, data_length); return true; }); assert(res->body.empty()); body = std::string(R"()") + body; updateVectorOf(outputs, body); } ModuleHTTP::ModuleHTTP()// std::string fname, std::string id, int port): : ModuleBase(R"JSON( [{ "name":"url", "value":"" },{ "name":"port", "value":256 },{ "name":"file", "value":"" }])JSON") { setType("HTTP"); } bool ModuleHTTP::configure(const std::string& config) { json config_parsed = json::parse(config); bool found = false; for (size_t i = 0; i < config_parsed.size(); i++) { if (config_parsed[i].contains("name")) { if (config_parsed[i]["name"].get() == "url") { _id = config_parsed[i]["value"].get(); found = true; } else if (config_parsed[i]["name"].get() == "port") { _port = config_parsed[i]["value"].get(); found = true; } else if (config_parsed[i]["name"].get() == "file") { _fname = config_parsed[i]["value"].get(); found = true; } else { std::cout << "ERROR Configure option: " << config_parsed[i]["name"].dump() << " do not exist!" << std::endl; return false; } } else { std::cout << "ERROR Wrong configure format: " << config_parsed[i].dump() << std::endl; return false; } } if (!_fname.empty()) { if (std::filesystem::exists(_fname)) { _child = std::make_unique("python3 " + _fname + " " + std::to_string(_port)); } else { std::cout << "ERROR Couldnt find: " << _fname << std::endl; return false; } } while (!connect()) { std::this_thread::sleep_for(std::chrono::microseconds(500)); } updateInputs(); updateOutputs(); return found; } ModuleHTTP::~ModuleHTTP() { if (_child != nullptr) { _child->terminate(); _child->join(); } } state ModuleHTTP::update() { Client cli(_id, _port); json new_inputs; for (int i = 0; i < inputs.size(); ++i) { json input; input["value"] = getInput(i)->getValue(); //std::cout << "ModuleHTTP::update: last: " << getInput(i)->getValue().back() << std::endl; new_inputs.push_back(input); } std::string content = new_inputs.dump(); while (!connect()) { std::this_thread::sleep_for(std::chrono::microseconds(500)); } //std::cout << "ModuleHTTP::update: " << content << std::endl; auto res = cli.Post("/update",content.size(), [&](size_t offset, size_t length, DataSink &sink) { sink.write(content.data() + offset, length); return true; // return 'false' if you want to cancel the request. },"application/json"); while(!connect()){ std::this_thread::sleep_for(std::chrono::microseconds(500)); } return updateOutputs(); } }