diff options
Diffstat (limited to 'modules/codec')
-rw-r--r-- | modules/codec/.nix/derivation.nix | 1 | ||||
-rw-r--r-- | modules/codec/SConstruct | 5 | ||||
-rw-r--r-- | modules/codec/c++/remote.hpp (renamed from modules/codec/c++/rpc.hpp) | 4 | ||||
-rw-r--r-- | modules/codec/c++/remote_loopback.hpp | 16 | ||||
-rw-r--r-- | modules/codec/c++/transfer.hpp | 12 | ||||
-rw-r--r-- | modules/codec/c++/transfer_loopback.hpp | 140 | ||||
-rw-r--r-- | modules/codec/tests/remote_loopback.cpp | 51 |
7 files changed, 212 insertions, 17 deletions
diff --git a/modules/codec/.nix/derivation.nix b/modules/codec/.nix/derivation.nix index db27a5c..7f7811a 100644 --- a/modules/codec/.nix/derivation.nix +++ b/modules/codec/.nix/derivation.nix @@ -17,6 +17,7 @@ in stdenv.mkDerivation { buildInputs = [ forstio.core + forstio.async ]; nativeBuildInputs = [ diff --git a/modules/codec/SConstruct b/modules/codec/SConstruct index d4bc519..3773a17 100644 --- a/modules/codec/SConstruct +++ b/modules/codec/SConstruct @@ -47,7 +47,10 @@ env=Environment(ENV=os.environ, variables=env_vars, CPPPATH=[], CXX=['c++'], CPPDEFINES=['SAW_UNIX'], CXXFLAGS=['-std=c++20','-g','-Wall','-Wextra'], - LIBS=['forstio-core'] + LIBS=[ + 'forstio-core', + 'forstio-async' + ] ) env.__class__.add_source_files = add_kel_source_files env.Tool('compilation_db'); diff --git a/modules/codec/c++/rpc.hpp b/modules/codec/c++/remote.hpp index 6b63580..5803154 100644 --- a/modules/codec/c++/rpc.hpp +++ b/modules/codec/c++/remote.hpp @@ -1,10 +1,10 @@ #pragma once #include <forstio/id.hpp> -#include <forstio/codec/data.hpp> #include <forstio/async/async.hpp> -#include <forstio/codec/interface.hpp> +#include "data.hpp" +#include "interface.hpp" #include <variant> diff --git a/modules/codec/c++/remote_loopback.hpp b/modules/codec/c++/remote_loopback.hpp index 22ca15f..2949243 100644 --- a/modules/codec/c++/remote_loopback.hpp +++ b/modules/codec/c++/remote_loopback.hpp @@ -1,9 +1,9 @@ #pragma once -#include <forstio/codec/interface.hpp> - #include <variant> +#include "interface.hpp" +#include "remote.hpp" #include "transfer_loopback.hpp" namespace saw { @@ -37,7 +37,7 @@ class rpc_client<Iface, Encoding, Storage, rmt::Loopback> { * request the data from the remote */ template<typename IdT> - remote_data<IdT, Encoding, Storage, Remote> request_data(id<IdT> data); + remote_data<IdT, Encoding, Storage, rmt::Loopback> request_data(id<IdT> data); /** @todo * Determine type based on Name @@ -64,19 +64,23 @@ private: const remote_address<rmt::Loopback>* addr_; InterfaceT iface_; public: - rpc_server(const remode_address<rmt::Loopback>& addr__, InterfaceT iface__): + rpc_server(const remote_address<rmt::Loopback>& addr__, InterfaceT iface__): addr_{&addr__}, iface_{std::move(iface__)} {} + + // error_or<id<>> }; template<> class remote<rmt::Loopback> { - +public: /** * Resolves an address for the remote */ - conveyor<remote_address<rmt::Loopback>> resolve_address(); + error_or<own<remote_address<rmt::Loopback>>> parse_address(){ + return heap<remote_address<rmt::Loopback>>(); + } /** * Connect to a remote diff --git a/modules/codec/c++/transfer.hpp b/modules/codec/c++/transfer.hpp index 1fb297e..2fdd0b9 100644 --- a/modules/codec/c++/transfer.hpp +++ b/modules/codec/c++/transfer.hpp @@ -6,4 +6,16 @@ class data_server; template<typename Schema, typename Encoding, typename Remote> class data_client; + +namespace impl { +template<typename Encoding, typename Storage, typename T> +struct data_server_redux { + using type = std::tuple<>; +}; + +template<typename Encoding, typename Storage, typename... Schema> +struct data_server_redux<Encoding, Storage, tmpl_group<Schema...>> { + using type = std::tuple<std::unordered_map<uint64_t, data<Schema, Encoding, Storage>>...>; +}; +} } diff --git a/modules/codec/c++/transfer_loopback.hpp b/modules/codec/c++/transfer_loopback.hpp index ec61669..6c81c9c 100644 --- a/modules/codec/c++/transfer_loopback.hpp +++ b/modules/codec/c++/transfer_loopback.hpp @@ -1,20 +1,152 @@ #pragma once +#include <forstio/reduce_templates.hpp> + +#include "transfer.hpp" + namespace saw { namespace rmt { struct Loopback {}; } -template<typename Schema, typename Encoding> -class data_server<Schema, Encoding, rmt::Loopback> { +template<typename... Schema, typename Encoding> +class data_server<tmpl_group<Schema...>, Encoding, rmt::Loopback> { private: + typename impl::data_server_redux<Encoding, storage::Default, typename tmpl_reduce<tmpl_group<Schema...>>::type>::type values_; public: + /** + * Get data from client + */ + template<typename Sch> + error_or<void> send(const data<Sch, Encoding, storage::Default>& dat, id<Sch> store_id){ + auto& vals = std::get<std::unordered_map<uint64_t, data<Sch,Encoding>>>(values_); + + try { + auto insert_res = vals.emplace(std::make_pair(store_id.get_value(), std::move(dat))); + if(!insert_res.second){ + return make_error<err::already_exists>(); + } + }catch(std::exception& ){ + return make_error<err::out_of_memory>(); + } + return void_t{}; + } + + template<typename Sch> + error_or<data<Sch, Encoding, storage::Default>> receive(id<Sch> store_id){ + auto& vals = std::get<std::unordered_map<uint64_t, data<Sch,Encoding>>>(values_); + + auto find_res = vals.find(store_id.get_value()); + if(find_res == vals.end()){ + return make_error<err::not_found>(); + } + + auto& dat = find_res->second; + return dat; + } + + template<typename Sch> + error_or<void> erase(id<Sch> store_id){ + auto& vals = std::get<std::unordered_map<uint64_t, data<Sch,Encoding>>>(values_); + auto erase_op = vals.erase(store_id.get_value()); + if(erase_op == 0u){ + return make_error<err::not_found>(); + } + return void_t{}; + } + + template<typename Sch> + error_or<data<Sch, Encoding, storage::Default>*> find(id<Sch> store_id){ + auto& vals = std::get<std::unordered_map<uint64_t, data<Sch,Encoding>>>(values_); + auto find_res = vals.find(store_id.get_value()); + if(find_res == vals.end()){ + return make_error<err::not_found>(); + } + + auto& dat = find_res->second; + return &dat; + } }; -template<typename Schema, typename Encoding> -class data_client<Schema, Encoding, rmt::Loopback> { +/** + * Client for transporting data to remote and receiving data back + */ +template<typename... Schema, typename Encoding> +class data_client<tmpl_group<Schema...>, Encoding, rmt::Loopback> { private: + /** + * Corresponding server for this client + */ + data_server<tmpl_group<Schema...>, Encoding, rmt::Loopback>* srv_; + + /** + * The next id for identifying issues on the remote side. + */ + uint64_t next_id_; public: + /** + * Main constructor + */ + data_client(data_server<tmpl_group<Schema...>, Encoding, rmt::Loopback>& srv__): + srv_{&srv__}, + next_id_{0u} + {} + + /** + * Send data to the remote. + */ + template<typename Sch> + error_or<id<Sch>> send(const data<Sch, Encoding, storage::Default>& dat){ + id<Sch> dat_id{next_id_}; + auto eov = srv_->send(dat, dat_id); + if(eov.is_error()){ + auto& err = eov.get_error(); + return std::move(err); + } + + ++next_id_; + return dat_id; + } + + /** + * Receive data + */ + template<typename Sch> + conveyor<data<Sch, Encoding, storage::Default>> receive(id<Sch> dat_id){ + auto eov = srv_->receive(dat_id); + if(eov.is_error()){ + auto& err = eov.get_error(); + return std::move(err); + } + + auto& val = eov.get_value(); + return std::move(val); + } + + /** + * Erase data + */ + template<typename Sch> + error_or<void> erase(id<Sch> dat_id){ + return srv_->erase(dat_id); + } + + /** + * An exception for the Loopback backend. Here we can safely use find from + * the client side. + */ + template<typename Sch> + error_or<data<Sch, Encoding>*> find(id<Sch> dat_id){ + auto eov = srv_->find(dat_id); + if(eov.is_error()){ + auto& err = eov.get_error(); + return std::move(err); + } + + auto val = eov.get_value(); + return val; + } + }; } diff --git a/modules/codec/tests/remote_loopback.cpp b/modules/codec/tests/remote_loopback.cpp index 2f6c06c..2895ee7 100644 --- a/modules/codec/tests/remote_loopback.cpp +++ b/modules/codec/tests/remote_loopback.cpp @@ -1,6 +1,6 @@ #include <forstio/test/suite.hpp> -#include "remote_loopback.hpp" +#include "../c++/remote_loopback.hpp" namespace { namespace sch { @@ -9,9 +9,16 @@ using namespace saw::schema; using TestInterface = Interface< Member<Function<UInt32, Int64>, "foo"> >; + +using GroupedSchemas = saw::tmpl_group< + UInt64, + String, + Array<Int32>, + Float64 +>; } -SAW_TEST("Remote Loopback"){ +SAW_TEST("Remote Loopback Data"){ using namespace saw; remote<rmt::Loopback> rmt; @@ -22,9 +29,45 @@ SAW_TEST("Remote Loopback"){ interface<sch::TestInterface, encode::Native, storage::Default> iface{ [](data<sch::UInt32>& foo){ - return foo.template cast<sch::Int64>(); + return foo.template cast_to<sch::Int64>(); } }; - auto rpc_srv = rmt.listen(*val, std::move(iface)); + + auto srv = data_server<sch::GroupedSchemas, encode::Native, rmt::Loopback>{}; + auto client = data_client<sch::GroupedSchemas, encode::Native, rmt::Loopback>{srv}; + + data<sch::UInt64> foo{421}; + id<sch::UInt64> sent_id = [&](){ + auto eov = client.send(foo); + SAW_EXPECT(eov.is_value(), "Failed send."); + return eov.get_value(); + }(); + + event_loop loop; + wait_scope wait{loop}; + + { + auto conv = client.receive(sent_id); + auto eov = conv.take(); + + SAW_EXPECT(eov.is_value(), "Failed receive."); + SAW_EXPECT(eov.get_value() == foo, "Wrong received value."); + } + { + auto eov = client.find(sent_id); + SAW_EXPECT(eov.is_value(), "Failed find."); + auto& f_val = eov.get_value(); + SAW_EXPECT(f_val, "Nullptr in find."); + SAW_EXPECT(*f_val == foo, "Wrong received value."); + } + { + auto eov = client.erase(sent_id); + SAW_EXPECT(eov.is_value(), "Failed erase."); + } + { + auto conv = client.receive(sent_id); + auto eov = conv.take(); + SAW_EXPECT(!eov.is_value(), "Failed receive. Value should already be erased."); + } } } |