diff --git a/source/forstio/io.h b/source/forstio/io.h index 190ca53..1e9eeba 100644 --- a/source/forstio/io.h +++ b/source/forstio/io.h @@ -61,7 +61,7 @@ public: virtual Conveyor writeDone() = 0; }; -class AsyncIoStream : public AsyncInputStream, public AsyncOutputStream { +class AsyncIoStream final : public AsyncInputStream, public AsyncOutputStream { private: Own stream; @@ -75,6 +75,9 @@ private: public: AsyncIoStream(Own str); + SAW_FORBID_COPY(AsyncIoStream); + SAW_FORBID_MOVE(AsyncIoStream); + void read(void *buffer, size_t length, size_t max_length) override; Conveyor readDone() override; diff --git a/source/forstio/io_peer.h b/source/forstio/io_peer.h new file mode 100644 index 0000000..62b0ef1 --- /dev/null +++ b/source/forstio/io_peer.h @@ -0,0 +1,42 @@ +#pragma once + +#include "async.h" +#include "message.h" +#include "io.h" + +namespace saw { + +template , typename OutContainer = MessageContainer, typename BufferT = RingBuffer> +class StreamingIoPeer { +private: + Own>> incoming_feeder = nullptr; + + Own io_stream; + + Codec codec; + + BufferT in_buffer; + BufferT out_buffer; + + SinkConveyor sink_read; + SinkConveyor sink_write; + +public: + StreamingIoPeer(Own>> feed, Own stream, Codec codec, BufferT in, BufferT out); + StreamingIoPeer(Own>> feed, Own stream); + + void send(HeapMessageRoot builder); + + Conveyor onReadDisconnected(); +}; + +/** + * Setup new streaming io peer with the provided network protocols. + * This is a convenience wrapper intended for a faster setup of + */ +template , typename OutContainer = MessageContainer, typename BufferT = RingBuffer> +std::pair, Conveyor>> newStreamingIoPeer(Own stream); + +} // namespace saw + +#include "io_peer.tmpl.h" diff --git a/source/forstio/io_peer.tmpl.h b/source/forstio/io_peer.tmpl.h new file mode 100644 index 0000000..04cf16b --- /dev/null +++ b/source/forstio/io_peer.tmpl.h @@ -0,0 +1,96 @@ +namespace saw { + +template , typename OutContainer = MessageContainer, typename BufferT = RingBuffer> +StreamingIoPeer::StreamingIoPeer( +Own>> feed, +Own str +): + StreamingIoPeer{std::move(feed), std::move(str), {}, {}, {}} +{ +} + +template , typename OutContainer = MessageContainer, typename BufferT = RingBuffer> +StreamingIoPeer::StreamingIoPeer( +Own>> feed, +Own str, Codec codec, BufferT in, BufferT out): + incoming_feeder{std::move(feed)}, + io_stream{std::move(str)}, + codec{std::move(codec)}, + in_buffer{std::move(in)}, + out_buffer{std::move(out)}, + sink_read{ + io_stream->readDone().then([this](size_t bytes) -> ErrorOr { + in_buffer.writeAdvance(bytes); + + if(in_buffer.writeSegmentLength() == 0){ + return criticalError("Message too long"); + } + + io_stream->read(&in_buffer.write(), 1, in_buffer.writeSegmentLength()); + + while(true){ + auto root = heapMessageRoot(); + auto builder = root.build(); + + Error error = codec.template decode(builder, in_buffer); + if(error.isCritical()){ + return error; + } + + if(!error.failed()){ + incoming_feeder->handle(std::move(root)); + }else{ + break; + } + } + + return Void{}; + }).sink([this](Error error){ + incoming_feeder->fail(error); + + return error; + }) + }, + sink_write{ + io_stream->writeDone().then([this](size_bytes) -> ErrorOr { + out_buffer.readAdvance(bytes); + if(out_buffer.readCompositeLength() > 0){ + io_stream->write(&out_buffer.read(), out_buffer.readSegmengtLength()); + } + + return Void{}; + }).sink(); + } +{ + io_stream->read(&in_buffer.write(), 1, in_buffer.writeSegmentLength()); +} + +template , typename OutContainer = MessageContainer, typename BufferT = RingBuffer> +void StreamingIoPeer::send(HeapMessageRoot msg){ + bool restart_write = out_buffer.readSegmentLength() == 0; + + Error error = codec.template encode(msg.read(), out_buffer); + if(error.failed()){ + return error; + } + + if(restart_write){ + io_stream->write(&out_buffer.read(), out_buffer.readSegmentLength()); + } + + return noError(); +} + +template , typename OutContainer = MessageContainer, typename BufferT = RingBuffer> +Conveyor StreamingIoPeer::onReadDisconnected(){ + return io_stream->onReadDisconnected(); +} + +template , typename OutContainer = MessageContainer, typename BufferT = RingBuffer> +std::pair, Conveyor>> newStreamingIoPeer(Own stream){ + auto caf = newConveyorAndFeeder>(); + + return {{std::move(caf.feeder), std::move(stream)}, std::move(caf.conveyor)}; +} + +} diff --git a/source/forstio/io_wrapper.h b/source/forstio/io_wrapper.h deleted file mode 100644 index 66588fb..0000000 --- a/source/forstio/io_wrapper.h +++ /dev/null @@ -1,27 +0,0 @@ -#pragma once - -#include "async.h" -#include "message.h" -#include "io.h" - -namespace saw { - -template , class OutContainer = MessageContainer> -class StreamingIoPeer { -private: - Codec codec; - - Own io_stream; - - Own>> incoming_feeder = nullptr; -public: - StreamingIoPeer(Own stream); - - void send(HeapMessageRoot builder); - - Conveyor startReadPump(); -}; - - - -} // namespace saw