diff options
author | Claudius 'keldu' Holeksa <mail@keldu.de> | 2024-08-12 13:42:48 +0200 |
---|---|---|
committer | Claudius 'keldu' Holeksa <mail@keldu.de> | 2024-08-12 13:42:48 +0200 |
commit | e13f6a5e91ffeac86c32ab3a9048b810b1931061 (patch) | |
tree | 1eec0e8f5ca6edfd99581f5b175371a21c8f652b /modules/remote | |
parent | b9a4cf706cf0145c814ef5987dad21ebc4172ac6 (diff) |
wip
Diffstat (limited to 'modules/remote')
-rw-r--r-- | modules/remote/c++/remote.hpp | 1 | ||||
-rw-r--r-- | modules/remote/c++/remote_loopback.hpp | 18 | ||||
-rw-r--r-- | modules/remote/c++/remote_loopback_base.hpp | 66 | ||||
-rw-r--r-- | modules/remote/c++/transfer.hpp | 14 | ||||
-rw-r--r-- | modules/remote/c++/transfer_loopback.hpp | 136 | ||||
-rw-r--r-- | modules/remote/tests/remote_loopback.cpp | 12 |
6 files changed, 194 insertions, 53 deletions
diff --git a/modules/remote/c++/remote.hpp b/modules/remote/c++/remote.hpp index 72c9cce..571a4e7 100644 --- a/modules/remote/c++/remote.hpp +++ b/modules/remote/c++/remote.hpp @@ -102,7 +102,6 @@ public: template<typename Remote> class remote_address { static_assert(always_false<Remote>, "Type of remote not supported"); - /** * */ diff --git a/modules/remote/c++/remote_loopback.hpp b/modules/remote/c++/remote_loopback.hpp index 17f1bfe..fde410d 100644 --- a/modules/remote/c++/remote_loopback.hpp +++ b/modules/remote/c++/remote_loopback.hpp @@ -4,7 +4,7 @@ #include <forstio/codec/interface.hpp> -#include "remote.hpp" +#include "remote_loopback_base.hpp" #include "transfer_loopback.hpp" namespace saw { @@ -53,19 +53,6 @@ class rpc_client<Iface, Encoding, Storage, rmt::Loopback> { */ }; -template<> -class remote_address<rmt::Loopback> { -private: - data<schema::UInt64> addr_id_; -public: - remote_address(data<schema::UInt64> addr_id__): - addr_id_{addr_id__} - {} - - const data<schema::UInt64>& get_address_id() const { - return addr_id_; - } -}; template<typename Iface, typename Encode, typename Storage> class rpc_server<Iface, Encode, Storage, rmt::Loopback> { @@ -81,8 +68,7 @@ public: {} // error_or<id<>> - conveyor< - > call + // conveyor<> call }; } diff --git a/modules/remote/c++/remote_loopback_base.hpp b/modules/remote/c++/remote_loopback_base.hpp index 9b66fa8..87d4393 100644 --- a/modules/remote/c++/remote_loopback_base.hpp +++ b/modules/remote/c++/remote_loopback_base.hpp @@ -4,18 +4,42 @@ #include <map> +#include <forstio/codec/schema_hash.hpp> + namespace saw { namespace rmt { struct Loopback {}; } template<> +class remote_address<rmt::Loopback> { +private: + data<schema::UInt64> addr_id_; +public: + remote_address(data<schema::UInt64> addr_id__): + addr_id_{addr_id__} + {} + + const data<schema::UInt64>& get_address_id() const { + return addr_id_; + } +}; + +template<> class remote<rmt::Loopback> { private: struct key_t { std::array<uint64_t,3> data; - bool operator<(const std::array<uint64_t,3>& rhs) const { + template<typename Schema, typename Encoding> + static key_t create(const remote_address<rmt::Loopback>& addr){ + key_t k; + k.data = std::array<uint64_t,3>{addr.get_address_id().get(), schema_hash<Schema>::apply(), schema_hash<Encoding>::apply()}; + + return k; + } + + bool operator<(const key_t& rhs) const { for(uint64_t i = 0u; i < 3; ++i){ if(data[i] != rhs.data[i]){ return data[i] < rhs.data[i]; @@ -53,11 +77,16 @@ public: */ template<typename Schema, typename Encode> error_or<own<data_server<Schema, Encode, rmt::Loopback>>> data_listen(const remote_address<rmt::Loopback>& addr){ - auto find = registered_data_servers_.find(addr.get_address_id()); - if(find == registered_data_servers_.end()){ + + auto key_v = key_t::template create<Schema,Encode>(addr); + + auto find = registered_data_servers_.find(key_v); + if(find != registered_data_servers_.end()){ return make_error<err::already_exists>("Server already bound to this address"); } - return {addr,*this}; + + auto dat_srv = heap<data_server<Schema, Encode, rmt::Loopback>>(*this, addr); + return dat_srv; } /** @@ -65,16 +94,22 @@ public: */ template<typename Schema, typename Encode> conveyor<data_client<Schema, Encode, rmt::Loopback>> data_connect(const remote_address<rmt::Loopback>& addr){ - auto class_id = srv.get_class_id(); + constexpr auto class_id = data_server<Schema, Encode, rmt::Loopback>::class_id; key_t key; - key.data = {addr.get_address_id(), class_id.first, class_id.second}; + key.data = std::array<uint64_t,3>{addr.get_address_id().get(), class_id.first, class_id.second}; auto find = registered_data_servers_.find(key); if(find == registered_data_servers_.end()){ return make_error<err::not_found>("Server not found"); } + auto eo_dat_srv = find->second().template cast_to<data_server<Schema, Encode, rmt::Loopback>>(); + if(eo_dat_srv.is_error()){ + auto& err = eo_dat_srv.get_error(); + return std::move(err); + } + auto dat_srv = eo_dat_srv.get_value(); - + return data_client<Schema, Encode, rmt::Loopback>{dat_srv}; } /** @@ -84,13 +119,26 @@ public: auto class_id = srv.get_class_id(); key_t key; - key.data = {addr.get_address_id(), class_id.first, class_id.second}; - auto insert = registered_data_servers_.emplace(std::make_pair(key, {srv})); + key.data = std::array<uint64_t,3>{addr.get_address_id().get(), class_id.first, class_id.second}; + + auto insert = registered_data_servers_.emplace(std::make_pair(key, ptr<i_data_server<rmt::Loopback>>{srv})); if(insert.second){ return make_error<err::already_exists>("Server already bound to this address"); } return make_void(); } + + error_or<void> deregister_data_server(const remote_address<rmt::Loopback>& addr, i_data_server<rmt::Loopback>& srv){ + auto class_id = srv.get_class_id(); + key_t key; + key.data = std::array<uint64_t,3>{addr.get_address_id().get(), class_id.first, class_id.second}; + auto erase = registered_data_servers_.erase(key); + if(erase == 0u){ + return make_error<err::not_found>("Server not found"); + } + + return make_void(); + } }; } diff --git a/modules/remote/c++/transfer.hpp b/modules/remote/c++/transfer.hpp index 32416de..f68497b 100644 --- a/modules/remote/c++/transfer.hpp +++ b/modules/remote/c++/transfer.hpp @@ -2,6 +2,8 @@ #include <map> +#include <forstio/common.hpp> + namespace saw { template<typename Remote> class i_data_server { @@ -9,6 +11,18 @@ protected: ~i_data_server() = default; public: virtual std::pair<uint32_t,uint32_t> get_class_id() const = 0; + + template<typename To> + error_or<ptr<To>> cast_to(){ + { + auto rhs = get_class_id(); + if(To::class_id.first == rhs.first && To::class_id.second == rhs.second){ + return {ptr<To>{*static_cast<To*>(this)}}; + } + } + + return make_error<err::invalid_state>("Class IDs are not matching."); + } }; template<typename Schema, typename Encoding, typename Remote> diff --git a/modules/remote/c++/transfer_loopback.hpp b/modules/remote/c++/transfer_loopback.hpp index 55a43fa..9e3c7f6 100644 --- a/modules/remote/c++/transfer_loopback.hpp +++ b/modules/remote/c++/transfer_loopback.hpp @@ -2,12 +2,10 @@ #include <forstio/reduce_templates.hpp> +#include "remote_loopback.hpp" #include "transfer.hpp" namespace saw { -namespace rmt { -struct Loopback {}; -} template<> class remote<rmt::Loopback>; @@ -22,15 +20,17 @@ private: ptr<remote<rmt::Loopback>> remote_; remote_address<rmt::Loopback> rmt_address_; public: + static constexpr std::pair<uint64_t,uint64_t> class_id{schema_hash<Schema>::apply(), schema_hash<Encoding>::apply()}; + data_server(ptr<remote<rmt::Loopback>> remote__, const remote_address<rmt::Loopback>& addr): remote_{remote__}, rmt_address_{addr} { - remote_().register_server(addr); + remote_().register_data_server(addr, *this); } ~data_server(){ - remote_().deregister_server(addr); + remote_().deregister_data_server(rmt_address_, *this); } SAW_FORBID_COPY(data_server); @@ -40,10 +40,19 @@ public: * Return the schema id */ std::pair<uint32_t,uint32_t> get_class_id() const override { - uint32_t schema_hash = schema_hash<Schema>::apply(); - uint32_t encode_hash = schema_hash<Encoding>::apply(); - - return std::make_pair(schema_hash, encode_hash); + return class_id; + } + + error_or<void> send(const data<Schema, Encoding>& dat, id<Schema> store_id){ + try { + auto insert_res = values_.emplace(std::make_pair(store_id.get_value(), std::move(dat))); + if(!insert_res.second){ + return make_error<err::already_exists>(); + } + }catch(std::exception& ){ + return make_error<err::out_of_memory>(); + } + return void_t{}; } }; @@ -55,15 +64,17 @@ private: ptr<remote<rmt::Loopback>> remote_; remote_address<rmt::Loopback> rmt_address_; public: + static constexpr std::pair<uint64_t,uint64_t> class_id{schema_hash<tmpl_group<Schema...>>::apply(), schema_hash<Encoding>::apply()}; + data_server(ptr<remote<rmt::Loopback>> remote__, const remote_address<rmt::Loopback>& addr): remote_{remote__}, rmt_address_{addr} { - remote_().register_server(addr); + remote_().register_data_server(addr,*this); } ~data_server(){ - remote_().deregister_server(addr); + remote_().deregister_data_server(rmt_address_,*this); } SAW_FORBID_COPY(data_server); @@ -73,10 +84,7 @@ public: * Return the schema id */ std::pair<uint32_t,uint32_t> get_class_id() const override { - uint32_t schema_hash = schema_hash<tmpl_group<Schema...>>::apply(); - uint32_t encode_hash = schema_hash<Encoding>::apply(); - - return std::make_pair(schema_hash, encode_hash); + return class_id; } /** @@ -150,6 +158,84 @@ public: } }; +template<typename Schema, typename Encoding> +class data_client<Schema, Encoding, rmt::Loopback> final { +private: + ptr<data_server<Schema, Encoding, rmt::Loopback>> srv_; + + uint64_t next_id_; +public: + data_client(ptr<data_server<Schema, Encoding, rmt::Loopback>> srv__): + srv_{srv__}, + next_id_{0u} + {} + + /** + * Send data to the remote. + */ + error_or<id<Schema>> send(const data<Schema, Encoding>& dat){ + id<Schema> dat_id{next_id_}; + auto eov = srv_().send(dat, dat_id); + if(eov.is_error()){ + auto& err = eov.get_error(); + return std::move(err); + } + + ++next_id_; + return dat_id; + } + + /** + * Preallocate data + */ + error_or<id<Schema>> allocate(const data<typename meta_schema<Schema>::MetaSchema, Encoding>& meta){ + id<Schema> dat_id{next_id_}; + auto eov = srv_().allocate(meta, dat_id); + if(eov.is_error()){ + auto& err = eov.get_error(); + return std::move(err); + } + ++next_id_; + return dat_id; + } + + /** + * Receive data + */ + conveyor<data<Schema, Encoding, storage::Default>> receive(id<Schema> dat_id){ + auto eov = srv_().receive(dat_id); + if(eov.is_error()){ + auto& err = eov.get_error(); + return std::move(err); + } + + auto& val = eov.get_value(); + return std::move(val); + } + + /** + * Erase data + */ + error_or<void> erase(id<Schema> dat_id){ + return srv_().erase(dat_id); + } + + /** + * An exception for the Loopback backend. Here we can safely use find from + * the client side. + */ + error_or<ref<data<Schema, Encoding>>> find(id<Schema> dat_id){ + auto eov = srv_().find(dat_id); + if(eov.is_error()){ + auto& err = eov.get_error(); + return std::move(err); + } + + auto val = eov.get_value(); + return {*val}; + } +}; + /** * Client for transporting data to remote and receiving data back */ @@ -159,7 +245,7 @@ private: /** * Corresponding server for this client */ - data_server<tmpl_group<Schema...>, Encoding, rmt::Loopback>* srv_; + ptr<data_server<tmpl_group<Schema...>, Encoding, rmt::Loopback>> srv_; /** * The next id for identifying issues on the remote side. @@ -169,18 +255,21 @@ public: /** * Main constructor */ - data_client(data_server<tmpl_group<Schema...>, Encoding, rmt::Loopback>& srv__): - srv_{&srv__}, + data_client(ptr<data_server<tmpl_group<Schema...>, Encoding, rmt::Loopback>> srv__): + srv_{srv__}, next_id_{0u} {} + SAW_FORBID_COPY(data_client); + SAW_DEFAULT_MOVE(data_client); + /** * Send data to the remote. */ template<typename Sch> error_or<id<Sch>> send(const data<Sch, Encoding, storage::Default>& dat){ id<Sch> dat_id{next_id_}; - auto eov = srv_->send(dat, dat_id); + auto eov = srv_().send(dat, dat_id); if(eov.is_error()){ auto& err = eov.get_error(); return std::move(err); @@ -196,7 +285,7 @@ public: template<typename Sch> error_or<id<Sch>> allocate(const data<typename meta_schema<Sch>::MetaSchema, Encoding>& meta){ id<Sch> dat_id{next_id_}; - auto eov = srv_->allocate(meta, dat_id); + auto eov = srv_().allocate(meta, dat_id); if(eov.is_error()){ auto& err = eov.get_error(); return std::move(err); @@ -210,7 +299,7 @@ public: */ template<typename Sch> conveyor<data<Sch, Encoding, storage::Default>> receive(id<Sch> dat_id){ - auto eov = srv_->receive(dat_id); + auto eov = srv_().receive(dat_id); if(eov.is_error()){ auto& err = eov.get_error(); return std::move(err); @@ -225,7 +314,7 @@ public: */ template<typename Sch> error_or<void> erase(id<Sch> dat_id){ - return srv_->erase(dat_id); + return srv_().erase(dat_id); } /** @@ -234,7 +323,7 @@ public: */ template<typename Sch> error_or<data<Sch, Encoding>*> find(id<Sch> dat_id){ - auto eov = srv_->find(dat_id); + auto eov = srv_().find(dat_id); if(eov.is_error()){ auto& err = eov.get_error(); return std::move(err); @@ -243,7 +332,6 @@ public: auto val = eov.get_value(); return val; } - }; } diff --git a/modules/remote/tests/remote_loopback.cpp b/modules/remote/tests/remote_loopback.cpp index 2430029..6ecf4e0 100644 --- a/modules/remote/tests/remote_loopback.cpp +++ b/modules/remote/tests/remote_loopback.cpp @@ -28,10 +28,16 @@ SAW_TEST("Remote Loopback Data"){ auto eov = rmt.parse_address(0u); SAW_EXPECT(eov.is_value(), "Didn't parse correctly"); - auto& val = eov.get_value(); + auto& addr = eov.get_value(); - auto srv = data_server<sch::GroupedSchemas, encode::Native, rmt::Loopback>{val}; - auto client = data_client<sch::GroupedSchemas, encode::Native, rmt::Loopback>{val}; + auto eo_srv = rmt.template data_listen<sch::GroupedSchemas, encode::Native>(*addr); + SAW_EXPECT(eo_srv.is_value(), std::string{"Couldn't listen: "} + std::string{eo_srv.get_error().get_category()}); + auto& srv = eo_srv.get_value(); + + auto cvr_client = rmt.template data_connect<sch::GroupedSchemas, encode::Native>(*addr); + auto eo_client = cvr_client.take(); + SAW_EXPECT(eo_client.is_value(), "Couldn't connect."); + auto& client = eo_client.get_value(); data<sch::UInt64> foo{421}; id<sch::UInt64> sent_id = [&](){ |