summaryrefslogtreecommitdiff
path: root/modules/io_codec/c++/io_peer.hpp
diff options
context:
space:
mode:
authorClaudius "keldu" Holeksa <mail@keldu.de>2024-03-07 10:22:17 +0100
committerClaudius "keldu" Holeksa <mail@keldu.de>2024-03-07 10:22:17 +0100
commitd2520819150d9a794baf7505d7d02dadeacd5266 (patch)
tree4bd9faa4f75aa0ed148bf7f8a70e8b058826fd57 /modules/io_codec/c++/io_peer.hpp
parent617538aa3278a4ca03b017d92047ba6d4027f30b (diff)
io_codec: Moved to new dir structure
Diffstat (limited to 'modules/io_codec/c++/io_peer.hpp')
-rw-r--r--modules/io_codec/c++/io_peer.hpp104
1 files changed, 104 insertions, 0 deletions
diff --git a/modules/io_codec/c++/io_peer.hpp b/modules/io_codec/c++/io_peer.hpp
new file mode 100644
index 0000000..9ba623f
--- /dev/null
+++ b/modules/io_codec/c++/io_peer.hpp
@@ -0,0 +1,104 @@
+#pragma once
+
+#include <forstio/async/async.hpp>
+#include <forstio/buffer.hpp>
+#include <forsto/io/io.hpp>
+#include <forstio/schema/message.hpp>
+
+namespace saw {
+
+template <typename Codec, typename Incoming, typename Outgoing,
+ typename InContainer = message_container<Incoming>,
+ typename OutContainer = message_container<Outgoing>,
+ typename BufferT = ring_buffer>
+class streaming_io_peer {
+public:
+ /**
+ *
+ */
+ streaming_io_peer(
+ own<conveyor_feeder<heap_message_root<Incoming, InContainer>>> feed,
+ own<async_io_stream> stream, Codec codec, BufferT in, BufferT out);
+ /**
+ *
+ */
+ streaming_io_peer(
+ own<conveyor_feeder<heap_message_root<Incoming, InContainer>>> feed,
+ own<async_io_stream> stream);
+
+ /**
+ * Deleted copy and move constructors
+ */
+ SAW_FORBID_COPY(streaming_io_peer);
+ SAW_FORBID_MOVE(streaming_io_peer);
+
+ /**
+ * Send a message to the remote peer
+ */
+ error send(heap_message_root<Outgoing, OutContainer> builder);
+
+ /**
+ * A phantom conveyor feeder. Meant for interfacing with other components
+ */
+ conveyor_feeder<heap_message_root<Outgoing, OutContainer>> &feeder();
+
+ conveyor<void> on_read_disconnected();
+
+private:
+ /// @unimplemented
+ class peer_conveyor_feeder final
+ : public conveyor_feeder<heap_message_root<Outgoing, OutContainer>> {
+ public:
+ peer_conveyor_feeder(
+ streaming_io_peer<Codec, Incoming, Outgoing, InContainer,
+ OutContainer, BufferT> &peer_)
+ : peer_{peer_} {}
+
+ void feed(heap_message_root<Outgoing, OutContainer> &&data) override {
+ (void)data;
+ }
+
+ void fail(error &&error) override { (void)error; }
+
+ size_t space() const override { return 0; }
+
+ size_t queued() const override { return 0; }
+
+ private:
+ streaming_io_peer<Codec, Incoming, Outgoing, InContainer, OutContainer,
+ BufferT> &peer_;
+ };
+
+private:
+ own<conveyor_feeder<heap_message_root<Incoming, InContainer>>>
+ incoming_feeder_ = nullptr;
+
+ own<async_io_stream> io_stream_;
+
+ Codec codec_;
+
+ BufferT in_buffer_;
+ BufferT out_buffer_;
+
+ conveyor_sink sink_read_;
+ conveyor_sink sink_write_;
+
+ peer_conveyor_feeder conveyor_feeder_;
+};
+
+/**
+ * Setup new streaming io peer with the provided network protocols.
+ * This is a convenience wrapper intended for a faster setup of this class
+ */
+template <typename Codec, typename Incoming, typename Outgoing,
+ typename InContainer = message_container<Incoming>,
+ typename OutContainer = message_container<Outgoing>,
+ typename BufferT = ring_buffer>
+std::pair<own<streaming_io_peer<Codec, Incoming, Outgoing, InContainer,
+ OutContainer, BufferT>>,
+ conveyor<heap_message_root<Incoming, InContainer>>>
+new_streaming_io_peer(own<async_io_stream> stream);
+
+} // namespace saw
+
+#include "io_peer.tmpl.hpp"