summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorClaudius "keldu" Holeksa <mail@keldu.de>2024-07-05 16:50:33 +0200
committerClaudius "keldu" Holeksa <mail@keldu.de>2024-07-05 16:50:33 +0200
commit614d0bf07789457a97d194c4af9cc7393f871351 (patch)
treefffe8f702589603b397497e14ca217428c98edec
parent9f320474f1c19861d46f6c42a09f7bba9dc919bc (diff)
Working on loopback data transmission
-rw-r--r--modules/codec/.nix/derivation.nix1
-rw-r--r--modules/codec/SConstruct5
-rw-r--r--modules/codec/c++/remote.hpp (renamed from modules/codec/c++/rpc.hpp)4
-rw-r--r--modules/codec/c++/remote_loopback.hpp16
-rw-r--r--modules/codec/c++/transfer.hpp12
-rw-r--r--modules/codec/c++/transfer_loopback.hpp140
-rw-r--r--modules/codec/tests/remote_loopback.cpp51
-rw-r--r--modules/remote-sycl/c++/transfer.hpp17
8 files changed, 215 insertions, 31 deletions
diff --git a/modules/codec/.nix/derivation.nix b/modules/codec/.nix/derivation.nix
index db27a5c..7f7811a 100644
--- a/modules/codec/.nix/derivation.nix
+++ b/modules/codec/.nix/derivation.nix
@@ -17,6 +17,7 @@ in stdenv.mkDerivation {
buildInputs = [
forstio.core
+ forstio.async
];
nativeBuildInputs = [
diff --git a/modules/codec/SConstruct b/modules/codec/SConstruct
index d4bc519..3773a17 100644
--- a/modules/codec/SConstruct
+++ b/modules/codec/SConstruct
@@ -47,7 +47,10 @@ env=Environment(ENV=os.environ, variables=env_vars, CPPPATH=[],
CXX=['c++'],
CPPDEFINES=['SAW_UNIX'],
CXXFLAGS=['-std=c++20','-g','-Wall','-Wextra'],
- LIBS=['forstio-core']
+ LIBS=[
+ 'forstio-core',
+ 'forstio-async'
+ ]
)
env.__class__.add_source_files = add_kel_source_files
env.Tool('compilation_db');
diff --git a/modules/codec/c++/rpc.hpp b/modules/codec/c++/remote.hpp
index 6b63580..5803154 100644
--- a/modules/codec/c++/rpc.hpp
+++ b/modules/codec/c++/remote.hpp
@@ -1,10 +1,10 @@
#pragma once
#include <forstio/id.hpp>
-#include <forstio/codec/data.hpp>
#include <forstio/async/async.hpp>
-#include <forstio/codec/interface.hpp>
+#include "data.hpp"
+#include "interface.hpp"
#include <variant>
diff --git a/modules/codec/c++/remote_loopback.hpp b/modules/codec/c++/remote_loopback.hpp
index 22ca15f..2949243 100644
--- a/modules/codec/c++/remote_loopback.hpp
+++ b/modules/codec/c++/remote_loopback.hpp
@@ -1,9 +1,9 @@
#pragma once
-#include <forstio/codec/interface.hpp>
-
#include <variant>
+#include "interface.hpp"
+#include "remote.hpp"
#include "transfer_loopback.hpp"
namespace saw {
@@ -37,7 +37,7 @@ class rpc_client<Iface, Encoding, Storage, rmt::Loopback> {
* request the data from the remote
*/
template<typename IdT>
- remote_data<IdT, Encoding, Storage, Remote> request_data(id<IdT> data);
+ remote_data<IdT, Encoding, Storage, rmt::Loopback> request_data(id<IdT> data);
/** @todo
* Determine type based on Name
@@ -64,19 +64,23 @@ private:
const remote_address<rmt::Loopback>* addr_;
InterfaceT iface_;
public:
- rpc_server(const remode_address<rmt::Loopback>& addr__, InterfaceT iface__):
+ rpc_server(const remote_address<rmt::Loopback>& addr__, InterfaceT iface__):
addr_{&addr__},
iface_{std::move(iface__)}
{}
+
+ // error_or<id<>>
};
template<>
class remote<rmt::Loopback> {
-
+public:
/**
* Resolves an address for the remote
*/
- conveyor<remote_address<rmt::Loopback>> resolve_address();
+ error_or<own<remote_address<rmt::Loopback>>> parse_address(){
+ return heap<remote_address<rmt::Loopback>>();
+ }
/**
* Connect to a remote
diff --git a/modules/codec/c++/transfer.hpp b/modules/codec/c++/transfer.hpp
index 1fb297e..2fdd0b9 100644
--- a/modules/codec/c++/transfer.hpp
+++ b/modules/codec/c++/transfer.hpp
@@ -6,4 +6,16 @@ class data_server;
template<typename Schema, typename Encoding, typename Remote>
class data_client;
+
+namespace impl {
+template<typename Encoding, typename Storage, typename T>
+struct data_server_redux {
+ using type = std::tuple<>;
+};
+
+template<typename Encoding, typename Storage, typename... Schema>
+struct data_server_redux<Encoding, Storage, tmpl_group<Schema...>> {
+ using type = std::tuple<std::unordered_map<uint64_t, data<Schema, Encoding, Storage>>...>;
+};
+}
}
diff --git a/modules/codec/c++/transfer_loopback.hpp b/modules/codec/c++/transfer_loopback.hpp
index ec61669..6c81c9c 100644
--- a/modules/codec/c++/transfer_loopback.hpp
+++ b/modules/codec/c++/transfer_loopback.hpp
@@ -1,20 +1,152 @@
#pragma once
+#include <forstio/reduce_templates.hpp>
+
+#include "transfer.hpp"
+
namespace saw {
namespace rmt {
struct Loopback {};
}
-template<typename Schema, typename Encoding>
-class data_server<Schema, Encoding, rmt::Loopback> {
+template<typename... Schema, typename Encoding>
+class data_server<tmpl_group<Schema...>, Encoding, rmt::Loopback> {
private:
+ typename impl::data_server_redux<Encoding, storage::Default, typename tmpl_reduce<tmpl_group<Schema...>>::type>::type values_;
public:
+ /**
+ * Get data from client
+ */
+ template<typename Sch>
+ error_or<void> send(const data<Sch, Encoding, storage::Default>& dat, id<Sch> store_id){
+ auto& vals = std::get<std::unordered_map<uint64_t, data<Sch,Encoding>>>(values_);
+
+ try {
+ auto insert_res = vals.emplace(std::make_pair(store_id.get_value(), std::move(dat)));
+ if(!insert_res.second){
+ return make_error<err::already_exists>();
+ }
+ }catch(std::exception& ){
+ return make_error<err::out_of_memory>();
+ }
+ return void_t{};
+ }
+
+ template<typename Sch>
+ error_or<data<Sch, Encoding, storage::Default>> receive(id<Sch> store_id){
+ auto& vals = std::get<std::unordered_map<uint64_t, data<Sch,Encoding>>>(values_);
+
+ auto find_res = vals.find(store_id.get_value());
+ if(find_res == vals.end()){
+ return make_error<err::not_found>();
+ }
+
+ auto& dat = find_res->second;
+ return dat;
+ }
+
+ template<typename Sch>
+ error_or<void> erase(id<Sch> store_id){
+ auto& vals = std::get<std::unordered_map<uint64_t, data<Sch,Encoding>>>(values_);
+ auto erase_op = vals.erase(store_id.get_value());
+ if(erase_op == 0u){
+ return make_error<err::not_found>();
+ }
+ return void_t{};
+ }
+
+ template<typename Sch>
+ error_or<data<Sch, Encoding, storage::Default>*> find(id<Sch> store_id){
+ auto& vals = std::get<std::unordered_map<uint64_t, data<Sch,Encoding>>>(values_);
+ auto find_res = vals.find(store_id.get_value());
+ if(find_res == vals.end()){
+ return make_error<err::not_found>();
+ }
+
+ auto& dat = find_res->second;
+ return &dat;
+ }
};
-template<typename Schema, typename Encoding>
-class data_client<Schema, Encoding, rmt::Loopback> {
+/**
+ * Client for transporting data to remote and receiving data back
+ */
+template<typename... Schema, typename Encoding>
+class data_client<tmpl_group<Schema...>, Encoding, rmt::Loopback> {
private:
+ /**
+ * Corresponding server for this client
+ */
+ data_server<tmpl_group<Schema...>, Encoding, rmt::Loopback>* srv_;
+
+ /**
+ * The next id for identifying issues on the remote side.
+ */
+ uint64_t next_id_;
public:
+ /**
+ * Main constructor
+ */
+ data_client(data_server<tmpl_group<Schema...>, Encoding, rmt::Loopback>& srv__):
+ srv_{&srv__},
+ next_id_{0u}
+ {}
+
+ /**
+ * Send data to the remote.
+ */
+ template<typename Sch>
+ error_or<id<Sch>> send(const data<Sch, Encoding, storage::Default>& dat){
+ id<Sch> dat_id{next_id_};
+ auto eov = srv_->send(dat, dat_id);
+ if(eov.is_error()){
+ auto& err = eov.get_error();
+ return std::move(err);
+ }
+
+ ++next_id_;
+ return dat_id;
+ }
+
+ /**
+ * Receive data
+ */
+ template<typename Sch>
+ conveyor<data<Sch, Encoding, storage::Default>> receive(id<Sch> dat_id){
+ auto eov = srv_->receive(dat_id);
+ if(eov.is_error()){
+ auto& err = eov.get_error();
+ return std::move(err);
+ }
+
+ auto& val = eov.get_value();
+ return std::move(val);
+ }
+
+ /**
+ * Erase data
+ */
+ template<typename Sch>
+ error_or<void> erase(id<Sch> dat_id){
+ return srv_->erase(dat_id);
+ }
+
+ /**
+ * An exception for the Loopback backend. Here we can safely use find from
+ * the client side.
+ */
+ template<typename Sch>
+ error_or<data<Sch, Encoding>*> find(id<Sch> dat_id){
+ auto eov = srv_->find(dat_id);
+ if(eov.is_error()){
+ auto& err = eov.get_error();
+ return std::move(err);
+ }
+
+ auto val = eov.get_value();
+ return val;
+ }
+
};
}
diff --git a/modules/codec/tests/remote_loopback.cpp b/modules/codec/tests/remote_loopback.cpp
index 2f6c06c..2895ee7 100644
--- a/modules/codec/tests/remote_loopback.cpp
+++ b/modules/codec/tests/remote_loopback.cpp
@@ -1,6 +1,6 @@
#include <forstio/test/suite.hpp>
-#include "remote_loopback.hpp"
+#include "../c++/remote_loopback.hpp"
namespace {
namespace sch {
@@ -9,9 +9,16 @@ using namespace saw::schema;
using TestInterface = Interface<
Member<Function<UInt32, Int64>, "foo">
>;
+
+using GroupedSchemas = saw::tmpl_group<
+ UInt64,
+ String,
+ Array<Int32>,
+ Float64
+>;
}
-SAW_TEST("Remote Loopback"){
+SAW_TEST("Remote Loopback Data"){
using namespace saw;
remote<rmt::Loopback> rmt;
@@ -22,9 +29,45 @@ SAW_TEST("Remote Loopback"){
interface<sch::TestInterface, encode::Native, storage::Default> iface{
[](data<sch::UInt32>& foo){
- return foo.template cast<sch::Int64>();
+ return foo.template cast_to<sch::Int64>();
}
};
- auto rpc_srv = rmt.listen(*val, std::move(iface));
+
+ auto srv = data_server<sch::GroupedSchemas, encode::Native, rmt::Loopback>{};
+ auto client = data_client<sch::GroupedSchemas, encode::Native, rmt::Loopback>{srv};
+
+ data<sch::UInt64> foo{421};
+ id<sch::UInt64> sent_id = [&](){
+ auto eov = client.send(foo);
+ SAW_EXPECT(eov.is_value(), "Failed send.");
+ return eov.get_value();
+ }();
+
+ event_loop loop;
+ wait_scope wait{loop};
+
+ {
+ auto conv = client.receive(sent_id);
+ auto eov = conv.take();
+
+ SAW_EXPECT(eov.is_value(), "Failed receive.");
+ SAW_EXPECT(eov.get_value() == foo, "Wrong received value.");
+ }
+ {
+ auto eov = client.find(sent_id);
+ SAW_EXPECT(eov.is_value(), "Failed find.");
+ auto& f_val = eov.get_value();
+ SAW_EXPECT(f_val, "Nullptr in find.");
+ SAW_EXPECT(*f_val == foo, "Wrong received value.");
+ }
+ {
+ auto eov = client.erase(sent_id);
+ SAW_EXPECT(eov.is_value(), "Failed erase.");
+ }
+ {
+ auto conv = client.receive(sent_id);
+ auto eov = conv.take();
+ SAW_EXPECT(!eov.is_value(), "Failed receive. Value should already be erased.");
+ }
}
}
diff --git a/modules/remote-sycl/c++/transfer.hpp b/modules/remote-sycl/c++/transfer.hpp
index 72b111f..2a95f67 100644
--- a/modules/remote-sycl/c++/transfer.hpp
+++ b/modules/remote-sycl/c++/transfer.hpp
@@ -9,17 +9,6 @@
#include <forstio/codec/transfer.hpp>
namespace saw {
-namespace impl {
-template<typename Encoding, typename T>
-struct data_server_redux {
- using type = std::tuple<>;
-};
-
-template<typename Encoding, typename... Schema>
-struct data_server_redux<Encoding, tmpl_group<Schema...>> {
- using type = std::tuple<std::unordered_map<uint64_t, data<Schema, Encoding, rmt::Sycl>>...>;
-};
-}
template<typename... Schema, typename Encoding>
class data_server<tmpl_group<Schema...>, Encoding, rmt::Sycl> {
@@ -32,7 +21,7 @@ private:
/**
* Store for the data the server manages.
*/
- typename impl::data_server_redux<Encoding, typename tmpl_reduce<tmpl_group<Schema...>>::type >::type values_;
+ typename impl::data_server_redux<Encoding, rmt::Sycl, typename tmpl_reduce<tmpl_group<Schema...>>::type >::type values_;
public:
/**
* Main constructor
@@ -85,7 +74,7 @@ public:
*/
template<typename Sch>
error_or<void> erase(id<Sch> store_id){
- auto& vals = std::get<Sch>(values_);
+ auto& vals = std::get<std::unordered_map<uint64_t, data<Sch,Encoding,rmt::Sycl>>>(values_);
auto erase_op = vals.erase(store_id.get_value());
if(erase_op == 0u){
return make_error<err::not_found>();
@@ -99,7 +88,7 @@ public:
*/
template<typename Sch>
error_or<data<Sch, Encoding, rmt::Sycl>*> find(id<Sch> store_id){
- auto& vals = std::get<Sch>(values_);
+ auto& vals = std::get<std::unordered_map<uint64_t, data<Sch,Encoding,rmt::Sycl>>>(values_);
auto find_res = vals.find(store_id.get_value());
if(find_res == vals.end()){
return make_error<err::not_found>();