summaryrefslogtreecommitdiff
path: root/modules/io_codec/c++
diff options
context:
space:
mode:
Diffstat (limited to 'modules/io_codec/c++')
-rw-r--r--modules/io_codec/c++/SConscript38
-rw-r--r--modules/io_codec/c++/io_peer.hpp104
-rw-r--r--modules/io_codec/c++/io_peer.tmpl.hpp117
-rw-r--r--modules/io_codec/c++/rpc.hpp31
4 files changed, 290 insertions, 0 deletions
diff --git a/modules/io_codec/c++/SConscript b/modules/io_codec/c++/SConscript
new file mode 100644
index 0000000..2a277cb
--- /dev/null
+++ b/modules/io_codec/c++/SConscript
@@ -0,0 +1,38 @@
+#!/bin/false
+
+import os
+import os.path
+import glob
+
+
+Import('env')
+
+dir_path = Dir('.').abspath
+
+# Environment for base library
+io_env = env.Clone();
+
+io_env.sources = sorted(glob.glob(dir_path + "/*.cpp"))
+io_env.headers = sorted(glob.glob(dir_path + "/*.hpp"))
+
+env.sources += io_env.sources;
+env.headers += io_env.headers;
+
+## Shared lib
+objects_shared = []
+io_env.add_source_files(objects_shared, io_env.sources, shared=True);
+io_env.library_shared = io_env.SharedLibrary('#build/forstio-io_codec', [objects_shared]);
+
+## Static lib
+objects_static = []
+io_env.add_source_files(objects_static, io_env.sources, shared=False);
+io_env.library_static = io_env.StaticLibrary('#build/forstio-io_codec', [objects_static]);
+
+# Set Alias
+env.Alias('library_io_codec', [io_env.library_shared, io_env.library_static]);
+
+env.targets += ['library_io_codec'];
+
+# Install
+env.Install('$prefix/lib/', [io_env.library_shared, io_env.library_static]);
+env.Install('$prefix/include/forstio/io_codec/', [io_env.headers]);
diff --git a/modules/io_codec/c++/io_peer.hpp b/modules/io_codec/c++/io_peer.hpp
new file mode 100644
index 0000000..9ba623f
--- /dev/null
+++ b/modules/io_codec/c++/io_peer.hpp
@@ -0,0 +1,104 @@
+#pragma once
+
+#include <forstio/async/async.hpp>
+#include <forstio/buffer.hpp>
+#include <forsto/io/io.hpp>
+#include <forstio/schema/message.hpp>
+
+namespace saw {
+
+template <typename Codec, typename Incoming, typename Outgoing,
+ typename InContainer = message_container<Incoming>,
+ typename OutContainer = message_container<Outgoing>,
+ typename BufferT = ring_buffer>
+class streaming_io_peer {
+public:
+ /**
+ *
+ */
+ streaming_io_peer(
+ own<conveyor_feeder<heap_message_root<Incoming, InContainer>>> feed,
+ own<async_io_stream> stream, Codec codec, BufferT in, BufferT out);
+ /**
+ *
+ */
+ streaming_io_peer(
+ own<conveyor_feeder<heap_message_root<Incoming, InContainer>>> feed,
+ own<async_io_stream> stream);
+
+ /**
+ * Deleted copy and move constructors
+ */
+ SAW_FORBID_COPY(streaming_io_peer);
+ SAW_FORBID_MOVE(streaming_io_peer);
+
+ /**
+ * Send a message to the remote peer
+ */
+ error send(heap_message_root<Outgoing, OutContainer> builder);
+
+ /**
+ * A phantom conveyor feeder. Meant for interfacing with other components
+ */
+ conveyor_feeder<heap_message_root<Outgoing, OutContainer>> &feeder();
+
+ conveyor<void> on_read_disconnected();
+
+private:
+ /// @unimplemented
+ class peer_conveyor_feeder final
+ : public conveyor_feeder<heap_message_root<Outgoing, OutContainer>> {
+ public:
+ peer_conveyor_feeder(
+ streaming_io_peer<Codec, Incoming, Outgoing, InContainer,
+ OutContainer, BufferT> &peer_)
+ : peer_{peer_} {}
+
+ void feed(heap_message_root<Outgoing, OutContainer> &&data) override {
+ (void)data;
+ }
+
+ void fail(error &&error) override { (void)error; }
+
+ size_t space() const override { return 0; }
+
+ size_t queued() const override { return 0; }
+
+ private:
+ streaming_io_peer<Codec, Incoming, Outgoing, InContainer, OutContainer,
+ BufferT> &peer_;
+ };
+
+private:
+ own<conveyor_feeder<heap_message_root<Incoming, InContainer>>>
+ incoming_feeder_ = nullptr;
+
+ own<async_io_stream> io_stream_;
+
+ Codec codec_;
+
+ BufferT in_buffer_;
+ BufferT out_buffer_;
+
+ conveyor_sink sink_read_;
+ conveyor_sink sink_write_;
+
+ peer_conveyor_feeder conveyor_feeder_;
+};
+
+/**
+ * Setup new streaming io peer with the provided network protocols.
+ * This is a convenience wrapper intended for a faster setup of this class
+ */
+template <typename Codec, typename Incoming, typename Outgoing,
+ typename InContainer = message_container<Incoming>,
+ typename OutContainer = message_container<Outgoing>,
+ typename BufferT = ring_buffer>
+std::pair<own<streaming_io_peer<Codec, Incoming, Outgoing, InContainer,
+ OutContainer, BufferT>>,
+ conveyor<heap_message_root<Incoming, InContainer>>>
+new_streaming_io_peer(own<async_io_stream> stream);
+
+} // namespace saw
+
+#include "io_peer.tmpl.hpp"
diff --git a/modules/io_codec/c++/io_peer.tmpl.hpp b/modules/io_codec/c++/io_peer.tmpl.hpp
new file mode 100644
index 0000000..880a58a
--- /dev/null
+++ b/modules/io_codec/c++/io_peer.tmpl.hpp
@@ -0,0 +1,117 @@
+namespace saw {
+
+template <typename Codec, typename Incoming, typename Outgoing,
+ typename InContainer, typename OutContainer, typename BufferT>
+streaming_io_peer<Codec, Incoming, Outgoing, InContainer, OutContainer,
+ BufferT>::
+ streaming_io_peer(
+ own<conveyor_feeder<heap_message_root<Incoming, InContainer>>> feed,
+ own<async_io_stream> str)
+ : streaming_io_peer{std::move(feed), std::move(str), {}, {}, {}} {}
+
+template <typename Codec, typename Incoming, typename Outgoing,
+ typename InContainer, typename OutContainer, typename BufferT>
+streaming_io_peer<Codec, Incoming, Outgoing, InContainer, OutContainer,
+ BufferT>::
+ streaming_io_peer(
+ own<conveyor_feeder<heap_message_root<Incoming, InContainer>>> feed,
+ own<async_io_stream> stream, Codec codec, BufferT in, BufferT out)
+ : incoming_feeder_{std::move(feed)},
+ io_stream_{std::move(stream)}, codec_{std::move(codec)},
+ in_buffer_{std::move(in)}, out_buffer_{std::move(out)},
+ sink_read_{
+ io_stream_->read_done()
+ .then([this](size_t bytes) -> error_or<void> {
+ in_buffer_.write_advance(bytes);
+
+ if (in_buffer_.write_segment_length() == 0) {
+ return critical_error("Message too long");
+ }
+
+ io_stream_->read(&in_buffer_.write(), 1,
+ in_buffer_.write_segment_length());
+
+ while (true) {
+ auto root = heap_message_root<Incoming, InContainer>();
+ auto builder = root.build();
+
+ error err = codec_.template decode<Incoming, InContainer>(
+ builder, in_buffer_);
+ if (err.is_critical()) {
+ return err;
+ }
+
+ if (!err.failed()) {
+ incoming_feeder_->feed(std::move(root));
+ } else {
+ break;
+ }
+ }
+
+ return void_t{};
+ })
+ .sink([this](error err) {
+ incoming_feeder_->fail(err.copy_error());
+
+ return err;
+ })},
+ sink_write_{io_stream_->write_done()
+ .then([this](size_t bytes) -> error_or<void> {
+ out_buffer_.read_advance(bytes);
+ if (out_buffer_.readCompositeLength() > 0) {
+ io_stream_->write(
+ &out_buffer_.read(),
+ out_buffer_.read_segment_length());
+ }
+
+ return void_t{};
+ })
+ .sink()} {
+ io_stream_->read(&in_buffer_.write(), 1, in_buffer_.write_segment_length());
+}
+
+template <typename Codec, typename Incoming, typename Outgoing,
+ typename InContainer, typename OutContainer, typename BufferT>
+error streaming_io_peer<Codec, Incoming, Outgoing, InContainer, OutContainer,
+ BufferT>::send(heap_message_root<Outgoing, OutContainer>
+ msg) {
+ bool restart_write = out_buffer_.read_segment_length() == 0;
+
+ error err =
+ codec_.template encode<Outgoing, OutContainer>(msg.read(), out_buffer_);
+ if (err.failed()) {
+ return err;
+ }
+
+ if (restart_write) {
+ io_stream_->write(&out_buffer_.read(),
+ out_buffer_.read_segment_length());
+ }
+
+ return no_error();
+}
+
+template <typename Codec, typename Incoming, typename Outgoing,
+ typename InContainer, typename OutContainer, typename BufferT>
+conveyor<void>
+streaming_io_peer<Codec, Incoming, Outgoing, InContainer, OutContainer,
+ BufferT>::on_read_disconnected() {
+ return io_stream_->on_read_disconnected();
+}
+
+template <typename Codec, typename Incoming, typename Outgoing,
+ typename InContainer, typename OutContainer, typename BufferT>
+std::pair<own<streaming_io_peer<Codec, Incoming, Outgoing, InContainer,
+ OutContainer, BufferT>>,
+ conveyor<heap_message_root<Incoming, InContainer>>>
+newstreaming_io_peer(own<async_io_stream> stream) {
+ auto caf =
+ new_conveyor_and_feeder<heap_message_root<Incoming, InContainer>>();
+
+ return {heap<streaming_io_peer<Codec, Incoming, Outgoing, InContainer,
+ OutContainer, BufferT>>(
+ std::move(caf.feeder), std::move(stream)),
+ std::move(caf.conveyor)};
+}
+
+} // namespace saw
diff --git a/modules/io_codec/c++/rpc.hpp b/modules/io_codec/c++/rpc.hpp
new file mode 100644
index 0000000..947ce54
--- /dev/null
+++ b/modules/io_codec/c++/rpc.hpp
@@ -0,0 +1,31 @@
+#pragma once
+
+#include <forstio/codec/rpc.hpp>
+
+namespace saw {
+namespace rmt {
+struct Network {};
+}
+
+template<>
+class remote<rmt::Network> {
+private:
+ std::string addr_str_;
+public:
+ remote(std::string addr_str);
+
+ template<typename Interface>
+ conveyor<rpc_client<rmt::Network, Interface>> connect();
+};
+
+template<template Interface>
+class rpc_client<rmt::Network, Interface> {
+private:
+ own<io_stream> stream_;
+public:
+ rpc_client(own<io_stream> stream);
+
+ template<typename T>
+ remote_result<typename response<T>::type> call(typename request<T>::type req);
+};
+}