summaryrefslogtreecommitdiff
path: root/modules/remote
diff options
context:
space:
mode:
authorClaudius 'keldu' Holeksa <mail@keldu.de>2024-08-12 13:42:48 +0200
committerClaudius 'keldu' Holeksa <mail@keldu.de>2024-08-12 13:42:48 +0200
commite13f6a5e91ffeac86c32ab3a9048b810b1931061 (patch)
tree1eec0e8f5ca6edfd99581f5b175371a21c8f652b /modules/remote
parentb9a4cf706cf0145c814ef5987dad21ebc4172ac6 (diff)
wip
Diffstat (limited to 'modules/remote')
-rw-r--r--modules/remote/c++/remote.hpp1
-rw-r--r--modules/remote/c++/remote_loopback.hpp18
-rw-r--r--modules/remote/c++/remote_loopback_base.hpp66
-rw-r--r--modules/remote/c++/transfer.hpp14
-rw-r--r--modules/remote/c++/transfer_loopback.hpp136
-rw-r--r--modules/remote/tests/remote_loopback.cpp12
6 files changed, 194 insertions, 53 deletions
diff --git a/modules/remote/c++/remote.hpp b/modules/remote/c++/remote.hpp
index 72c9cce..571a4e7 100644
--- a/modules/remote/c++/remote.hpp
+++ b/modules/remote/c++/remote.hpp
@@ -102,7 +102,6 @@ public:
template<typename Remote>
class remote_address {
static_assert(always_false<Remote>, "Type of remote not supported");
-
/**
*
*/
diff --git a/modules/remote/c++/remote_loopback.hpp b/modules/remote/c++/remote_loopback.hpp
index 17f1bfe..fde410d 100644
--- a/modules/remote/c++/remote_loopback.hpp
+++ b/modules/remote/c++/remote_loopback.hpp
@@ -4,7 +4,7 @@
#include <forstio/codec/interface.hpp>
-#include "remote.hpp"
+#include "remote_loopback_base.hpp"
#include "transfer_loopback.hpp"
namespace saw {
@@ -53,19 +53,6 @@ class rpc_client<Iface, Encoding, Storage, rmt::Loopback> {
*/
};
-template<>
-class remote_address<rmt::Loopback> {
-private:
- data<schema::UInt64> addr_id_;
-public:
- remote_address(data<schema::UInt64> addr_id__):
- addr_id_{addr_id__}
- {}
-
- const data<schema::UInt64>& get_address_id() const {
- return addr_id_;
- }
-};
template<typename Iface, typename Encode, typename Storage>
class rpc_server<Iface, Encode, Storage, rmt::Loopback> {
@@ -81,8 +68,7 @@ public:
{}
// error_or<id<>>
- conveyor<
- > call
+ // conveyor<> call
};
}
diff --git a/modules/remote/c++/remote_loopback_base.hpp b/modules/remote/c++/remote_loopback_base.hpp
index 9b66fa8..87d4393 100644
--- a/modules/remote/c++/remote_loopback_base.hpp
+++ b/modules/remote/c++/remote_loopback_base.hpp
@@ -4,18 +4,42 @@
#include <map>
+#include <forstio/codec/schema_hash.hpp>
+
namespace saw {
namespace rmt {
struct Loopback {};
}
template<>
+class remote_address<rmt::Loopback> {
+private:
+ data<schema::UInt64> addr_id_;
+public:
+ remote_address(data<schema::UInt64> addr_id__):
+ addr_id_{addr_id__}
+ {}
+
+ const data<schema::UInt64>& get_address_id() const {
+ return addr_id_;
+ }
+};
+
+template<>
class remote<rmt::Loopback> {
private:
struct key_t {
std::array<uint64_t,3> data;
- bool operator<(const std::array<uint64_t,3>& rhs) const {
+ template<typename Schema, typename Encoding>
+ static key_t create(const remote_address<rmt::Loopback>& addr){
+ key_t k;
+ k.data = std::array<uint64_t,3>{addr.get_address_id().get(), schema_hash<Schema>::apply(), schema_hash<Encoding>::apply()};
+
+ return k;
+ }
+
+ bool operator<(const key_t& rhs) const {
for(uint64_t i = 0u; i < 3; ++i){
if(data[i] != rhs.data[i]){
return data[i] < rhs.data[i];
@@ -53,11 +77,16 @@ public:
*/
template<typename Schema, typename Encode>
error_or<own<data_server<Schema, Encode, rmt::Loopback>>> data_listen(const remote_address<rmt::Loopback>& addr){
- auto find = registered_data_servers_.find(addr.get_address_id());
- if(find == registered_data_servers_.end()){
+
+ auto key_v = key_t::template create<Schema,Encode>(addr);
+
+ auto find = registered_data_servers_.find(key_v);
+ if(find != registered_data_servers_.end()){
return make_error<err::already_exists>("Server already bound to this address");
}
- return {addr,*this};
+
+ auto dat_srv = heap<data_server<Schema, Encode, rmt::Loopback>>(*this, addr);
+ return dat_srv;
}
/**
@@ -65,16 +94,22 @@ public:
*/
template<typename Schema, typename Encode>
conveyor<data_client<Schema, Encode, rmt::Loopback>> data_connect(const remote_address<rmt::Loopback>& addr){
- auto class_id = srv.get_class_id();
+ constexpr auto class_id = data_server<Schema, Encode, rmt::Loopback>::class_id;
key_t key;
- key.data = {addr.get_address_id(), class_id.first, class_id.second};
+ key.data = std::array<uint64_t,3>{addr.get_address_id().get(), class_id.first, class_id.second};
auto find = registered_data_servers_.find(key);
if(find == registered_data_servers_.end()){
return make_error<err::not_found>("Server not found");
}
+ auto eo_dat_srv = find->second().template cast_to<data_server<Schema, Encode, rmt::Loopback>>();
+ if(eo_dat_srv.is_error()){
+ auto& err = eo_dat_srv.get_error();
+ return std::move(err);
+ }
+ auto dat_srv = eo_dat_srv.get_value();
-
+ return data_client<Schema, Encode, rmt::Loopback>{dat_srv};
}
/**
@@ -84,13 +119,26 @@ public:
auto class_id = srv.get_class_id();
key_t key;
- key.data = {addr.get_address_id(), class_id.first, class_id.second};
- auto insert = registered_data_servers_.emplace(std::make_pair(key, {srv}));
+ key.data = std::array<uint64_t,3>{addr.get_address_id().get(), class_id.first, class_id.second};
+
+ auto insert = registered_data_servers_.emplace(std::make_pair(key, ptr<i_data_server<rmt::Loopback>>{srv}));
if(insert.second){
return make_error<err::already_exists>("Server already bound to this address");
}
return make_void();
}
+
+ error_or<void> deregister_data_server(const remote_address<rmt::Loopback>& addr, i_data_server<rmt::Loopback>& srv){
+ auto class_id = srv.get_class_id();
+ key_t key;
+ key.data = std::array<uint64_t,3>{addr.get_address_id().get(), class_id.first, class_id.second};
+ auto erase = registered_data_servers_.erase(key);
+ if(erase == 0u){
+ return make_error<err::not_found>("Server not found");
+ }
+
+ return make_void();
+ }
};
}
diff --git a/modules/remote/c++/transfer.hpp b/modules/remote/c++/transfer.hpp
index 32416de..f68497b 100644
--- a/modules/remote/c++/transfer.hpp
+++ b/modules/remote/c++/transfer.hpp
@@ -2,6 +2,8 @@
#include <map>
+#include <forstio/common.hpp>
+
namespace saw {
template<typename Remote>
class i_data_server {
@@ -9,6 +11,18 @@ protected:
~i_data_server() = default;
public:
virtual std::pair<uint32_t,uint32_t> get_class_id() const = 0;
+
+ template<typename To>
+ error_or<ptr<To>> cast_to(){
+ {
+ auto rhs = get_class_id();
+ if(To::class_id.first == rhs.first && To::class_id.second == rhs.second){
+ return {ptr<To>{*static_cast<To*>(this)}};
+ }
+ }
+
+ return make_error<err::invalid_state>("Class IDs are not matching.");
+ }
};
template<typename Schema, typename Encoding, typename Remote>
diff --git a/modules/remote/c++/transfer_loopback.hpp b/modules/remote/c++/transfer_loopback.hpp
index 55a43fa..9e3c7f6 100644
--- a/modules/remote/c++/transfer_loopback.hpp
+++ b/modules/remote/c++/transfer_loopback.hpp
@@ -2,12 +2,10 @@
#include <forstio/reduce_templates.hpp>
+#include "remote_loopback.hpp"
#include "transfer.hpp"
namespace saw {
-namespace rmt {
-struct Loopback {};
-}
template<>
class remote<rmt::Loopback>;
@@ -22,15 +20,17 @@ private:
ptr<remote<rmt::Loopback>> remote_;
remote_address<rmt::Loopback> rmt_address_;
public:
+ static constexpr std::pair<uint64_t,uint64_t> class_id{schema_hash<Schema>::apply(), schema_hash<Encoding>::apply()};
+
data_server(ptr<remote<rmt::Loopback>> remote__, const remote_address<rmt::Loopback>& addr):
remote_{remote__},
rmt_address_{addr}
{
- remote_().register_server(addr);
+ remote_().register_data_server(addr, *this);
}
~data_server(){
- remote_().deregister_server(addr);
+ remote_().deregister_data_server(rmt_address_, *this);
}
SAW_FORBID_COPY(data_server);
@@ -40,10 +40,19 @@ public:
* Return the schema id
*/
std::pair<uint32_t,uint32_t> get_class_id() const override {
- uint32_t schema_hash = schema_hash<Schema>::apply();
- uint32_t encode_hash = schema_hash<Encoding>::apply();
-
- return std::make_pair(schema_hash, encode_hash);
+ return class_id;
+ }
+
+ error_or<void> send(const data<Schema, Encoding>& dat, id<Schema> store_id){
+ try {
+ auto insert_res = values_.emplace(std::make_pair(store_id.get_value(), std::move(dat)));
+ if(!insert_res.second){
+ return make_error<err::already_exists>();
+ }
+ }catch(std::exception& ){
+ return make_error<err::out_of_memory>();
+ }
+ return void_t{};
}
};
@@ -55,15 +64,17 @@ private:
ptr<remote<rmt::Loopback>> remote_;
remote_address<rmt::Loopback> rmt_address_;
public:
+ static constexpr std::pair<uint64_t,uint64_t> class_id{schema_hash<tmpl_group<Schema...>>::apply(), schema_hash<Encoding>::apply()};
+
data_server(ptr<remote<rmt::Loopback>> remote__, const remote_address<rmt::Loopback>& addr):
remote_{remote__},
rmt_address_{addr}
{
- remote_().register_server(addr);
+ remote_().register_data_server(addr,*this);
}
~data_server(){
- remote_().deregister_server(addr);
+ remote_().deregister_data_server(rmt_address_,*this);
}
SAW_FORBID_COPY(data_server);
@@ -73,10 +84,7 @@ public:
* Return the schema id
*/
std::pair<uint32_t,uint32_t> get_class_id() const override {
- uint32_t schema_hash = schema_hash<tmpl_group<Schema...>>::apply();
- uint32_t encode_hash = schema_hash<Encoding>::apply();
-
- return std::make_pair(schema_hash, encode_hash);
+ return class_id;
}
/**
@@ -150,6 +158,84 @@ public:
}
};
+template<typename Schema, typename Encoding>
+class data_client<Schema, Encoding, rmt::Loopback> final {
+private:
+ ptr<data_server<Schema, Encoding, rmt::Loopback>> srv_;
+
+ uint64_t next_id_;
+public:
+ data_client(ptr<data_server<Schema, Encoding, rmt::Loopback>> srv__):
+ srv_{srv__},
+ next_id_{0u}
+ {}
+
+ /**
+ * Send data to the remote.
+ */
+ error_or<id<Schema>> send(const data<Schema, Encoding>& dat){
+ id<Schema> dat_id{next_id_};
+ auto eov = srv_().send(dat, dat_id);
+ if(eov.is_error()){
+ auto& err = eov.get_error();
+ return std::move(err);
+ }
+
+ ++next_id_;
+ return dat_id;
+ }
+
+ /**
+ * Preallocate data
+ */
+ error_or<id<Schema>> allocate(const data<typename meta_schema<Schema>::MetaSchema, Encoding>& meta){
+ id<Schema> dat_id{next_id_};
+ auto eov = srv_().allocate(meta, dat_id);
+ if(eov.is_error()){
+ auto& err = eov.get_error();
+ return std::move(err);
+ }
+ ++next_id_;
+ return dat_id;
+ }
+
+ /**
+ * Receive data
+ */
+ conveyor<data<Schema, Encoding, storage::Default>> receive(id<Schema> dat_id){
+ auto eov = srv_().receive(dat_id);
+ if(eov.is_error()){
+ auto& err = eov.get_error();
+ return std::move(err);
+ }
+
+ auto& val = eov.get_value();
+ return std::move(val);
+ }
+
+ /**
+ * Erase data
+ */
+ error_or<void> erase(id<Schema> dat_id){
+ return srv_().erase(dat_id);
+ }
+
+ /**
+ * An exception for the Loopback backend. Here we can safely use find from
+ * the client side.
+ */
+ error_or<ref<data<Schema, Encoding>>> find(id<Schema> dat_id){
+ auto eov = srv_().find(dat_id);
+ if(eov.is_error()){
+ auto& err = eov.get_error();
+ return std::move(err);
+ }
+
+ auto val = eov.get_value();
+ return {*val};
+ }
+};
+
/**
* Client for transporting data to remote and receiving data back
*/
@@ -159,7 +245,7 @@ private:
/**
* Corresponding server for this client
*/
- data_server<tmpl_group<Schema...>, Encoding, rmt::Loopback>* srv_;
+ ptr<data_server<tmpl_group<Schema...>, Encoding, rmt::Loopback>> srv_;
/**
* The next id for identifying issues on the remote side.
@@ -169,18 +255,21 @@ public:
/**
* Main constructor
*/
- data_client(data_server<tmpl_group<Schema...>, Encoding, rmt::Loopback>& srv__):
- srv_{&srv__},
+ data_client(ptr<data_server<tmpl_group<Schema...>, Encoding, rmt::Loopback>> srv__):
+ srv_{srv__},
next_id_{0u}
{}
+ SAW_FORBID_COPY(data_client);
+ SAW_DEFAULT_MOVE(data_client);
+
/**
* Send data to the remote.
*/
template<typename Sch>
error_or<id<Sch>> send(const data<Sch, Encoding, storage::Default>& dat){
id<Sch> dat_id{next_id_};
- auto eov = srv_->send(dat, dat_id);
+ auto eov = srv_().send(dat, dat_id);
if(eov.is_error()){
auto& err = eov.get_error();
return std::move(err);
@@ -196,7 +285,7 @@ public:
template<typename Sch>
error_or<id<Sch>> allocate(const data<typename meta_schema<Sch>::MetaSchema, Encoding>& meta){
id<Sch> dat_id{next_id_};
- auto eov = srv_->allocate(meta, dat_id);
+ auto eov = srv_().allocate(meta, dat_id);
if(eov.is_error()){
auto& err = eov.get_error();
return std::move(err);
@@ -210,7 +299,7 @@ public:
*/
template<typename Sch>
conveyor<data<Sch, Encoding, storage::Default>> receive(id<Sch> dat_id){
- auto eov = srv_->receive(dat_id);
+ auto eov = srv_().receive(dat_id);
if(eov.is_error()){
auto& err = eov.get_error();
return std::move(err);
@@ -225,7 +314,7 @@ public:
*/
template<typename Sch>
error_or<void> erase(id<Sch> dat_id){
- return srv_->erase(dat_id);
+ return srv_().erase(dat_id);
}
/**
@@ -234,7 +323,7 @@ public:
*/
template<typename Sch>
error_or<data<Sch, Encoding>*> find(id<Sch> dat_id){
- auto eov = srv_->find(dat_id);
+ auto eov = srv_().find(dat_id);
if(eov.is_error()){
auto& err = eov.get_error();
return std::move(err);
@@ -243,7 +332,6 @@ public:
auto val = eov.get_value();
return val;
}
-
};
}
diff --git a/modules/remote/tests/remote_loopback.cpp b/modules/remote/tests/remote_loopback.cpp
index 2430029..6ecf4e0 100644
--- a/modules/remote/tests/remote_loopback.cpp
+++ b/modules/remote/tests/remote_loopback.cpp
@@ -28,10 +28,16 @@ SAW_TEST("Remote Loopback Data"){
auto eov = rmt.parse_address(0u);
SAW_EXPECT(eov.is_value(), "Didn't parse correctly");
- auto& val = eov.get_value();
+ auto& addr = eov.get_value();
- auto srv = data_server<sch::GroupedSchemas, encode::Native, rmt::Loopback>{val};
- auto client = data_client<sch::GroupedSchemas, encode::Native, rmt::Loopback>{val};
+ auto eo_srv = rmt.template data_listen<sch::GroupedSchemas, encode::Native>(*addr);
+ SAW_EXPECT(eo_srv.is_value(), std::string{"Couldn't listen: "} + std::string{eo_srv.get_error().get_category()});
+ auto& srv = eo_srv.get_value();
+
+ auto cvr_client = rmt.template data_connect<sch::GroupedSchemas, encode::Native>(*addr);
+ auto eo_client = cvr_client.take();
+ SAW_EXPECT(eo_client.is_value(), "Couldn't connect.");
+ auto& client = eo_client.get_value();
data<sch::UInt64> foo{421};
id<sch::UInt64> sent_id = [&](){