From 33777ffd67825b7c31343513ddfd733579096fc9 Mon Sep 17 00:00:00 2001 From: Claudius Holeksa Date: Sun, 13 Mar 2022 14:40:34 +0100 Subject: [PATCH 1/2] io peering class for easier impl --- source/forstio/io.h | 5 +- source/forstio/io_wrapper.h | 27 +++++++-- source/forstio/io_wrapper.tmpl.h | 96 ++++++++++++++++++++++++++++++++ 3 files changed, 121 insertions(+), 7 deletions(-) create mode 100644 source/forstio/io_wrapper.tmpl.h diff --git a/source/forstio/io.h b/source/forstio/io.h index 190ca53..1e9eeba 100644 --- a/source/forstio/io.h +++ b/source/forstio/io.h @@ -61,7 +61,7 @@ public: virtual Conveyor writeDone() = 0; }; -class AsyncIoStream : public AsyncInputStream, public AsyncOutputStream { +class AsyncIoStream final : public AsyncInputStream, public AsyncOutputStream { private: Own stream; @@ -75,6 +75,9 @@ private: public: AsyncIoStream(Own str); + SAW_FORBID_COPY(AsyncIoStream); + SAW_FORBID_MOVE(AsyncIoStream); + void read(void *buffer, size_t length, size_t max_length) override; Conveyor readDone() override; diff --git a/source/forstio/io_wrapper.h b/source/forstio/io_wrapper.h index 66588fb..8cab7ac 100644 --- a/source/forstio/io_wrapper.h +++ b/source/forstio/io_wrapper.h @@ -6,22 +6,37 @@ namespace saw { -template , class OutContainer = MessageContainer> +template , typename OutContainer = MessageContainer, typename BufferT = RingBuffer> class StreamingIoPeer { private: - Codec codec; + Own>> incoming_feeder = nullptr; Own io_stream; - Own>> incoming_feeder = nullptr; + Codec codec; + + BufferT in_buffer; + BufferT out_buffer; + + SinkConveyor sink_read; + SinkConveyor sink_write; + public: - StreamingIoPeer(Own stream); + StreamingIoPeer(Own>> feed, Own stream, Codec codec, BufferT in, BufferT out); + StreamingIoPeer(Own>> feed, Own stream); void send(HeapMessageRoot builder); - Conveyor startReadPump(); + Conveyor onReadDisconnected(); }; - +/** + * Setup new streaming io peer with the provided network protocols. + * This is a convenience wrapper intended for a faster setup of + */ +template , typename OutContainer = MessageContainer, typename BufferT = RingBuffer> +std::pair, Conveyor>> newStreamingIoPeer(Own stream); } // namespace saw + +#include "io_wrapper.tmpl.h" diff --git a/source/forstio/io_wrapper.tmpl.h b/source/forstio/io_wrapper.tmpl.h new file mode 100644 index 0000000..04cf16b --- /dev/null +++ b/source/forstio/io_wrapper.tmpl.h @@ -0,0 +1,96 @@ +namespace saw { + +template , typename OutContainer = MessageContainer, typename BufferT = RingBuffer> +StreamingIoPeer::StreamingIoPeer( +Own>> feed, +Own str +): + StreamingIoPeer{std::move(feed), std::move(str), {}, {}, {}} +{ +} + +template , typename OutContainer = MessageContainer, typename BufferT = RingBuffer> +StreamingIoPeer::StreamingIoPeer( +Own>> feed, +Own str, Codec codec, BufferT in, BufferT out): + incoming_feeder{std::move(feed)}, + io_stream{std::move(str)}, + codec{std::move(codec)}, + in_buffer{std::move(in)}, + out_buffer{std::move(out)}, + sink_read{ + io_stream->readDone().then([this](size_t bytes) -> ErrorOr { + in_buffer.writeAdvance(bytes); + + if(in_buffer.writeSegmentLength() == 0){ + return criticalError("Message too long"); + } + + io_stream->read(&in_buffer.write(), 1, in_buffer.writeSegmentLength()); + + while(true){ + auto root = heapMessageRoot(); + auto builder = root.build(); + + Error error = codec.template decode(builder, in_buffer); + if(error.isCritical()){ + return error; + } + + if(!error.failed()){ + incoming_feeder->handle(std::move(root)); + }else{ + break; + } + } + + return Void{}; + }).sink([this](Error error){ + incoming_feeder->fail(error); + + return error; + }) + }, + sink_write{ + io_stream->writeDone().then([this](size_bytes) -> ErrorOr { + out_buffer.readAdvance(bytes); + if(out_buffer.readCompositeLength() > 0){ + io_stream->write(&out_buffer.read(), out_buffer.readSegmengtLength()); + } + + return Void{}; + }).sink(); + } +{ + io_stream->read(&in_buffer.write(), 1, in_buffer.writeSegmentLength()); +} + +template , typename OutContainer = MessageContainer, typename BufferT = RingBuffer> +void StreamingIoPeer::send(HeapMessageRoot msg){ + bool restart_write = out_buffer.readSegmentLength() == 0; + + Error error = codec.template encode(msg.read(), out_buffer); + if(error.failed()){ + return error; + } + + if(restart_write){ + io_stream->write(&out_buffer.read(), out_buffer.readSegmentLength()); + } + + return noError(); +} + +template , typename OutContainer = MessageContainer, typename BufferT = RingBuffer> +Conveyor StreamingIoPeer::onReadDisconnected(){ + return io_stream->onReadDisconnected(); +} + +template , typename OutContainer = MessageContainer, typename BufferT = RingBuffer> +std::pair, Conveyor>> newStreamingIoPeer(Own stream){ + auto caf = newConveyorAndFeeder>(); + + return {{std::move(caf.feeder), std::move(stream)}, std::move(caf.conveyor)}; +} + +} From 752ab4d838315802289e7e1f277ea0842c222812 Mon Sep 17 00:00:00 2001 From: Claudius Holeksa Date: Sun, 13 Mar 2022 14:42:34 +0100 Subject: [PATCH 2/2] renamed io peer files --- source/forstio/{io_wrapper.h => io_peer.h} | 2 +- source/forstio/{io_wrapper.tmpl.h => io_peer.tmpl.h} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename source/forstio/{io_wrapper.h => io_peer.h} (98%) rename source/forstio/{io_wrapper.tmpl.h => io_peer.tmpl.h} (100%) diff --git a/source/forstio/io_wrapper.h b/source/forstio/io_peer.h similarity index 98% rename from source/forstio/io_wrapper.h rename to source/forstio/io_peer.h index 8cab7ac..62b0ef1 100644 --- a/source/forstio/io_wrapper.h +++ b/source/forstio/io_peer.h @@ -39,4 +39,4 @@ std::pair