123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237 |
- #include <httplib.h>
- #include <iostream>
- #include <thread>
- #include <Generator.h>
- #include <boost/process.hpp>
- #include <filesystem>
- //#include <wait.h
- #include "ModuleBase.h"
- #include <httplib.h>
- #include <memory>
- #include <chrono>
- #include "IOptimizable.h"
- using namespace httplib;
- namespace mdd{
- using namespace boost::process;
- class ModuleHTTP : public ModuleBase {
- private:
- std::string _fname;
- std::string _id;
- int _port;
- std::unique_ptr<child> _child;
- protected:
- bool connect();
- std::string str_to_json(const std::string& input);
- template<class T>
- state updateVectorOf(std::vector<std::shared_ptr<T>>& vec, const std::string& msg);;
- state updateInputs();
- state updateOutputs();
- public:
- ModuleHTTP();// std::string fname, std::string id, int port);
- ~ModuleHTTP();
- state update() override;
- bool configure(const std::string& config) override;
- };
- bool ModuleHTTP::connect(){
- Client cli(_id, _port);
- std::string body;
- auto res = cli.Get("/status",
- [&](const char *data, size_t data_length) {
- body.append(data, data_length);
- return true;
- });
- if(body.empty()){
- return false;
- }
- body = std::string(R"()") + body;
- json status = json::parse(body.c_str());
- if (status["status"].get<std::string>() == "ready"){
- return true;
- }
- else{
- return false;
- }
- }
- std::string ModuleHTTP::str_to_json(const std::string& input){
- if(input.empty()){
- return input;
- }
- std::string str = input;
- size_t start_pos = 0;
- while((start_pos = str.find('[', start_pos)) != std::string::npos) {
- str.replace(start_pos, 1, "{");
- start_pos += 1;
- }
- start_pos = 0;
- while((start_pos = str.find(']', start_pos)) != std::string::npos) {
- str.replace(start_pos, 1, "}");
- start_pos += 1;
- }
- return str;
- }
- template<class T>
- state ModuleHTTP::updateVectorOf(std::vector<std::shared_ptr<T>>& vec, const std::string& msg) {
-
- json server = json::parse(msg.c_str());
- int diff = (int)server.size() - (int)vec.size();
- if (diff < 0)
- {
- vec.erase(vec.end() + diff, vec.end());
- }
- for(size_t i=0; i < vec.size(); i++){
- vec[i]->setType(server[i]["type"].get<std::string>());
- vec[i]->setAppendix(i);
- vec[i]->setValue() = server[i]["value"].get<std::vector<double>>();
- }
- for (size_t i = 0; i < diff; i++)
- {
- size_t length = vec.size();
- if (server[length]["value"].is_array())
- {
- vec.emplace_back(server[length]["type"].get<std::string>(), length, server_inputs[length]["value"].get<std::vector<double>>());
- }
- else {
- vec.emplace_back(server[length]["type"].get<std::string>(), length, std::vector<double>{server_inputs[length]["value"].get<double>()});
- std::cout << "Warning: Server expects single values, but mdd work with arrays!" << std::endl;
- }
- }
- return state::UNCHANGED;
- }
- state ModuleHTTP::updateInputs() {
- Client cli(_id, _port);
- std::string body;
- auto res = cli.Get("/inputs",
- [&](const char* data, size_t data_length) {
- body.append(data, data_length);
- return true;
- });
- assert(res->body.empty());
- body = std::string(R"()") + body;
- updateVectorOf(inputs, body);
- }
- state ModuleHTTP::updateOutputs() {
- Client cli(_id, _port);
- std::string body;
- auto res = cli.Get("/outputs",
- [&](const char* data, size_t data_length) {
- body.append(data, data_length);
- return true;
- });
- assert(res->body.empty());
- body = std::string(R"()") + body;
- updateVectorOf(outputs, body);
- }
- ModuleHTTP::ModuleHTTP()// std::string fname, std::string id, int port):
- : ModuleBase(R"JSON(
- [{
- "name":"url",
- "value":""
- },{
- "name":"port",
- "value":256
- },{
- "name":"file",
- "value":""
- }])JSON")
- {
- setType("HTTP");
- }
- bool ModuleHTTP::configure(const std::string& config) {
- json config_parsed = json::parse(config);
- bool found = false;
- for (size_t i = 0; i < config_parsed.size(); i++)
- {
- if (config_parsed[i].contains("name"))
- {
- if (config_parsed[i]["name"].get<std::string>() == "url")
- {
- _id = config_parsed[i]["value"].get<std::string>();
- found = true;
- }
- else if (config_parsed[i]["name"].get<std::string>() == "port")
- {
- _port = config_parsed[i]["value"].get<int>();
- found = true;
- }
- else if (config_parsed[i]["name"].get<std::string>() == "file")
- {
- _fname = config_parsed[i]["value"].get<std::string>();
- found = true;
- }
- else {
- std::cout << "ERROR Configure option: " << config_parsed[i]["name"].dump() << " do not exist!" << std::endl;
- return false;
- }
- }
- else {
- std::cout << "ERROR Wrong configure format: " << config_parsed[i].dump() << std::endl;
- return false;
- }
- }
- if (!_fname.empty()) {
- if (std::filesystem::exists(_fname))
- {
- _child = std::make_unique<child>("python3 " + _fname + " " + std::to_string(_port));
- }
- else {
- std::cout << "ERROR Couldnt find: " << _fname << std::endl;
- return false;
- }
- }
- while (!connect()) {
- std::this_thread::sleep_for(std::chrono::microseconds(500));
- }
- updateInputs();
- updateOutputs();
- return found;
- }
- ModuleHTTP::~ModuleHTTP()
- {
- if (_child != nullptr)
- {
- _child->terminate();
- _child->join();
- }
- }
- state ModuleHTTP::update() {
- Client cli(_id, _port);
- json new_inputs;
- for (int i = 0; i < inputs.size(); ++i) {
- json input;
- input["value"] = getInput(i)->getValue();
- //std::cout << "ModuleHTTP::update: last: " << getInput(i)->getValue().back() << std::endl;
- new_inputs.push_back(input);
- }
- std::string content = new_inputs.dump();
- while (!connect()) {
- std::this_thread::sleep_for(std::chrono::microseconds(500));
- }
- //std::cout << "ModuleHTTP::update: " << content << std::endl;
- auto res = cli.Post("/update",content.size(),
- [&](size_t offset, size_t length, DataSink &sink) {
- sink.write(content.data() + offset, length);
- return true; // return 'false' if you want to cancel the request.
- },"application/json");
- while(!connect()){
- std::this_thread::sleep_for(std::chrono::microseconds(500));
- }
- return updateOutputs();
- }
- }
|