diff options
Diffstat (limited to 'modules/io_codec/io_peer.tmpl.hpp')
-rw-r--r-- | modules/io_codec/io_peer.tmpl.hpp | 117 |
1 files changed, 117 insertions, 0 deletions
diff --git a/modules/io_codec/io_peer.tmpl.hpp b/modules/io_codec/io_peer.tmpl.hpp new file mode 100644 index 0000000..880a58a --- /dev/null +++ b/modules/io_codec/io_peer.tmpl.hpp @@ -0,0 +1,117 @@ +namespace saw { + +template <typename Codec, typename Incoming, typename Outgoing, + typename InContainer, typename OutContainer, typename BufferT> +streaming_io_peer<Codec, Incoming, Outgoing, InContainer, OutContainer, + BufferT>:: + streaming_io_peer( + own<conveyor_feeder<heap_message_root<Incoming, InContainer>>> feed, + own<async_io_stream> str) + : streaming_io_peer{std::move(feed), std::move(str), {}, {}, {}} {} + +template <typename Codec, typename Incoming, typename Outgoing, + typename InContainer, typename OutContainer, typename BufferT> +streaming_io_peer<Codec, Incoming, Outgoing, InContainer, OutContainer, + BufferT>:: + streaming_io_peer( + own<conveyor_feeder<heap_message_root<Incoming, InContainer>>> feed, + own<async_io_stream> stream, Codec codec, BufferT in, BufferT out) + : incoming_feeder_{std::move(feed)}, + io_stream_{std::move(stream)}, codec_{std::move(codec)}, + in_buffer_{std::move(in)}, out_buffer_{std::move(out)}, + sink_read_{ + io_stream_->read_done() + .then([this](size_t bytes) -> error_or<void> { + in_buffer_.write_advance(bytes); + + if (in_buffer_.write_segment_length() == 0) { + return critical_error("Message too long"); + } + + io_stream_->read(&in_buffer_.write(), 1, + in_buffer_.write_segment_length()); + + while (true) { + auto root = heap_message_root<Incoming, InContainer>(); + auto builder = root.build(); + + error err = codec_.template decode<Incoming, InContainer>( + builder, in_buffer_); + if (err.is_critical()) { + return err; + } + + if (!err.failed()) { + incoming_feeder_->feed(std::move(root)); + } else { + break; + } + } + + return void_t{}; + }) + .sink([this](error err) { + incoming_feeder_->fail(err.copy_error()); + + return err; + })}, + sink_write_{io_stream_->write_done() + .then([this](size_t bytes) -> error_or<void> { + out_buffer_.read_advance(bytes); + if (out_buffer_.readCompositeLength() > 0) { + io_stream_->write( + &out_buffer_.read(), + out_buffer_.read_segment_length()); + } + + return void_t{}; + }) + .sink()} { + io_stream_->read(&in_buffer_.write(), 1, in_buffer_.write_segment_length()); +} + +template <typename Codec, typename Incoming, typename Outgoing, + typename InContainer, typename OutContainer, typename BufferT> +error streaming_io_peer<Codec, Incoming, Outgoing, InContainer, OutContainer, + BufferT>::send(heap_message_root<Outgoing, OutContainer> + msg) { + bool restart_write = out_buffer_.read_segment_length() == 0; + + error err = + codec_.template encode<Outgoing, OutContainer>(msg.read(), out_buffer_); + if (err.failed()) { + return err; + } + + if (restart_write) { + io_stream_->write(&out_buffer_.read(), + out_buffer_.read_segment_length()); + } + + return no_error(); +} + +template <typename Codec, typename Incoming, typename Outgoing, + typename InContainer, typename OutContainer, typename BufferT> +conveyor<void> +streaming_io_peer<Codec, Incoming, Outgoing, InContainer, OutContainer, + BufferT>::on_read_disconnected() { + return io_stream_->on_read_disconnected(); +} + +template <typename Codec, typename Incoming, typename Outgoing, + typename InContainer, typename OutContainer, typename BufferT> +std::pair<own<streaming_io_peer<Codec, Incoming, Outgoing, InContainer, + OutContainer, BufferT>>, + conveyor<heap_message_root<Incoming, InContainer>>> +newstreaming_io_peer(own<async_io_stream> stream) { + auto caf = + new_conveyor_and_feeder<heap_message_root<Incoming, InContainer>>(); + + return {heap<streaming_io_peer<Codec, Incoming, Outgoing, InContainer, + OutContainer, BufferT>>( + std::move(caf.feeder), std::move(stream)), + std::move(caf.conveyor)}; +} + +} // namespace saw |