summaryrefslogtreecommitdiff
path: root/modules/io_codec/io_peer.h
diff options
context:
space:
mode:
authorClaudius "keldu" Holeksa <mail@keldu.de>2023-12-04 12:18:14 +0100
committerClaudius "keldu" Holeksa <mail@keldu.de>2023-12-04 12:18:14 +0100
commita14896f9ed209dd3f9597722e5a5697bd7dbf531 (patch)
tree089ca5cbbd206d1921f8f6b53292f5bc1902ca5c /modules/io_codec/io_peer.h
parent84ecdcbca9e55b1f57fbb832e12ff4fdbb86e7c9 (diff)
meta: Renamed folder containing source
Diffstat (limited to 'modules/io_codec/io_peer.h')
-rw-r--r--modules/io_codec/io_peer.h104
1 files changed, 104 insertions, 0 deletions
diff --git a/modules/io_codec/io_peer.h b/modules/io_codec/io_peer.h
new file mode 100644
index 0000000..b9a4b34
--- /dev/null
+++ b/modules/io_codec/io_peer.h
@@ -0,0 +1,104 @@
+#pragma once
+
+#include <forstio/async/async.h>
+#include <forstio/buffer.h>
+#include <forsto/io/io.h>
+#include <forstio/schema/message.h>
+
+namespace saw {
+
+template <typename Codec, typename Incoming, typename Outgoing,
+ typename InContainer = message_container<Incoming>,
+ typename OutContainer = message_container<Outgoing>,
+ typename BufferT = ring_buffer>
+class streaming_io_peer {
+public:
+ /**
+ *
+ */
+ streaming_io_peer(
+ own<conveyor_feeder<heap_message_root<Incoming, InContainer>>> feed,
+ own<async_io_stream> stream, Codec codec, BufferT in, BufferT out);
+ /**
+ *
+ */
+ streaming_io_peer(
+ own<conveyor_feeder<heap_message_root<Incoming, InContainer>>> feed,
+ own<async_io_stream> stream);
+
+ /**
+ * Deleted copy and move constructors
+ */
+ SAW_FORBID_COPY(streaming_io_peer);
+ SAW_FORBID_MOVE(streaming_io_peer);
+
+ /**
+ * Send a message to the remote peer
+ */
+ error send(heap_message_root<Outgoing, OutContainer> builder);
+
+ /**
+ * A phantom conveyor feeder. Meant for interfacing with other components
+ */
+ conveyor_feeder<heap_message_root<Outgoing, OutContainer>> &feeder();
+
+ conveyor<void> on_read_disconnected();
+
+private:
+ /// @unimplemented
+ class peer_conveyor_feeder final
+ : public conveyor_feeder<heap_message_root<Outgoing, OutContainer>> {
+ public:
+ peer_conveyor_feeder(
+ streaming_io_peer<Codec, Incoming, Outgoing, InContainer,
+ OutContainer, BufferT> &peer_)
+ : peer_{peer_} {}
+
+ void feed(heap_message_root<Outgoing, OutContainer> &&data) override {
+ (void)data;
+ }
+
+ void fail(error &&error) override { (void)error; }
+
+ size_t space() const override { return 0; }
+
+ size_t queued() const override { return 0; }
+
+ private:
+ streaming_io_peer<Codec, Incoming, Outgoing, InContainer, OutContainer,
+ BufferT> &peer_;
+ };
+
+private:
+ own<conveyor_feeder<heap_message_root<Incoming, InContainer>>>
+ incoming_feeder_ = nullptr;
+
+ own<async_io_stream> io_stream_;
+
+ Codec codec_;
+
+ BufferT in_buffer_;
+ BufferT out_buffer_;
+
+ conveyor_sink sink_read_;
+ conveyor_sink sink_write_;
+
+ peer_conveyor_feeder conveyor_feeder_;
+};
+
+/**
+ * Setup new streaming io peer with the provided network protocols.
+ * This is a convenience wrapper intended for a faster setup of this class
+ */
+template <typename Codec, typename Incoming, typename Outgoing,
+ typename InContainer = message_container<Incoming>,
+ typename OutContainer = message_container<Outgoing>,
+ typename BufferT = ring_buffer>
+std::pair<own<streaming_io_peer<Codec, Incoming, Outgoing, InContainer,
+ OutContainer, BufferT>>,
+ conveyor<heap_message_root<Incoming, InContainer>>>
+new_streaming_io_peer(own<async_io_stream> stream);
+
+} // namespace saw
+
+#include "io_peer.tmpl.h"