summaryrefslogtreecommitdiff
path: root/src/io_codec/io_peer.tmpl.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/io_codec/io_peer.tmpl.h')
-rw-r--r--src/io_codec/io_peer.tmpl.h117
1 files changed, 117 insertions, 0 deletions
diff --git a/src/io_codec/io_peer.tmpl.h b/src/io_codec/io_peer.tmpl.h
new file mode 100644
index 0000000..880a58a
--- /dev/null
+++ b/src/io_codec/io_peer.tmpl.h
@@ -0,0 +1,117 @@
+namespace saw {
+
+template <typename Codec, typename Incoming, typename Outgoing,
+ typename InContainer, typename OutContainer, typename BufferT>
+streaming_io_peer<Codec, Incoming, Outgoing, InContainer, OutContainer,
+ BufferT>::
+ streaming_io_peer(
+ own<conveyor_feeder<heap_message_root<Incoming, InContainer>>> feed,
+ own<async_io_stream> str)
+ : streaming_io_peer{std::move(feed), std::move(str), {}, {}, {}} {}
+
+template <typename Codec, typename Incoming, typename Outgoing,
+ typename InContainer, typename OutContainer, typename BufferT>
+streaming_io_peer<Codec, Incoming, Outgoing, InContainer, OutContainer,
+ BufferT>::
+ streaming_io_peer(
+ own<conveyor_feeder<heap_message_root<Incoming, InContainer>>> feed,
+ own<async_io_stream> stream, Codec codec, BufferT in, BufferT out)
+ : incoming_feeder_{std::move(feed)},
+ io_stream_{std::move(stream)}, codec_{std::move(codec)},
+ in_buffer_{std::move(in)}, out_buffer_{std::move(out)},
+ sink_read_{
+ io_stream_->read_done()
+ .then([this](size_t bytes) -> error_or<void> {
+ in_buffer_.write_advance(bytes);
+
+ if (in_buffer_.write_segment_length() == 0) {
+ return critical_error("Message too long");
+ }
+
+ io_stream_->read(&in_buffer_.write(), 1,
+ in_buffer_.write_segment_length());
+
+ while (true) {
+ auto root = heap_message_root<Incoming, InContainer>();
+ auto builder = root.build();
+
+ error err = codec_.template decode<Incoming, InContainer>(
+ builder, in_buffer_);
+ if (err.is_critical()) {
+ return err;
+ }
+
+ if (!err.failed()) {
+ incoming_feeder_->feed(std::move(root));
+ } else {
+ break;
+ }
+ }
+
+ return void_t{};
+ })
+ .sink([this](error err) {
+ incoming_feeder_->fail(err.copy_error());
+
+ return err;
+ })},
+ sink_write_{io_stream_->write_done()
+ .then([this](size_t bytes) -> error_or<void> {
+ out_buffer_.read_advance(bytes);
+ if (out_buffer_.readCompositeLength() > 0) {
+ io_stream_->write(
+ &out_buffer_.read(),
+ out_buffer_.read_segment_length());
+ }
+
+ return void_t{};
+ })
+ .sink()} {
+ io_stream_->read(&in_buffer_.write(), 1, in_buffer_.write_segment_length());
+}
+
+template <typename Codec, typename Incoming, typename Outgoing,
+ typename InContainer, typename OutContainer, typename BufferT>
+error streaming_io_peer<Codec, Incoming, Outgoing, InContainer, OutContainer,
+ BufferT>::send(heap_message_root<Outgoing, OutContainer>
+ msg) {
+ bool restart_write = out_buffer_.read_segment_length() == 0;
+
+ error err =
+ codec_.template encode<Outgoing, OutContainer>(msg.read(), out_buffer_);
+ if (err.failed()) {
+ return err;
+ }
+
+ if (restart_write) {
+ io_stream_->write(&out_buffer_.read(),
+ out_buffer_.read_segment_length());
+ }
+
+ return no_error();
+}
+
+template <typename Codec, typename Incoming, typename Outgoing,
+ typename InContainer, typename OutContainer, typename BufferT>
+conveyor<void>
+streaming_io_peer<Codec, Incoming, Outgoing, InContainer, OutContainer,
+ BufferT>::on_read_disconnected() {
+ return io_stream_->on_read_disconnected();
+}
+
+template <typename Codec, typename Incoming, typename Outgoing,
+ typename InContainer, typename OutContainer, typename BufferT>
+std::pair<own<streaming_io_peer<Codec, Incoming, Outgoing, InContainer,
+ OutContainer, BufferT>>,
+ conveyor<heap_message_root<Incoming, InContainer>>>
+newstreaming_io_peer(own<async_io_stream> stream) {
+ auto caf =
+ new_conveyor_and_feeder<heap_message_root<Incoming, InContainer>>();
+
+ return {heap<streaming_io_peer<Codec, Incoming, Outgoing, InContainer,
+ OutContainer, BufferT>>(
+ std::move(caf.feeder), std::move(stream)),
+ std::move(caf.conveyor)};
+}
+
+} // namespace saw