summaryrefslogtreecommitdiff
path: root/modules/io_codec/c++/io_peer.tmpl.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'modules/io_codec/c++/io_peer.tmpl.hpp')
-rw-r--r--modules/io_codec/c++/io_peer.tmpl.hpp46
1 files changed, 22 insertions, 24 deletions
diff --git a/modules/io_codec/c++/io_peer.tmpl.hpp b/modules/io_codec/c++/io_peer.tmpl.hpp
index 28bbbfb..e1863fc 100644
--- a/modules/io_codec/c++/io_peer.tmpl.hpp
+++ b/modules/io_codec/c++/io_peer.tmpl.hpp
@@ -9,9 +9,8 @@ streaming_io_peer<Incoming, Outgoing, Encoding,
own<async_io_stream> str)
: streaming_io_peer{std::move(feed), std::move(str), {}, {}, {}} {}
-template <typename Codec, typename Incoming, typename Outgoing,
- typename InContainer, typename OutContainer, typename BufferT>
-streaming_io_peer<Codec, Incoming, Outgoing, Encoding,
+template <typename Incoming, typename Outgoing, typename Encoding, typename BufferT>
+streaming_io_peer<Incoming, Outgoing, Encoding,
BufferT>::
streaming_io_peer(
own<conveyor_feeder<data<Incoming, Encoding>>> feed,
@@ -27,7 +26,7 @@ streaming_io_peer<Codec, Incoming, Outgoing, Encoding,
in_buffer_.write_advance(bytes);
if (in_buffer_.write_segment_length() == 0) {
- return critical_error("Message too long");
+ return make_error<err::critical>("Message too long");
}
io_stream_->read(&in_buffer_.write(), 1,
@@ -74,16 +73,18 @@ streaming_io_peer<Codec, Incoming, Outgoing, Encoding,
}
template <typename Incoming, typename Outgoing,
- typename InContainer, typename OutContainer, typename BufferT>
-error_or<void> streaming_io_peer<Codec, Incoming, Outgoing,
- BufferT>::send(heap_message_root<Outgoing, OutContainer>
+ typename Encoding, typename BufferT>
+error_or<void> streaming_io_peer<Incoming, Outgoing, Encoding,
+ BufferT>::send(data<Outgoing>
msg) {
bool restart_write = out_buffer_.read_segment_length() == 0;
- error err =
- codec_.template encode<Outgoing, OutContainer>(msg.read(), out_buffer_);
- if (err.failed()) {
- return err;
+ data<Outgoing, Encoding> enc;
+
+ auto eov =
+ out_codec_.encode(msg, enc);//msg.read(), out_buffer_);
+ if (eov.is_error()) {
+ return eov;
}
if (restart_write) {
@@ -91,28 +92,25 @@ error_or<void> streaming_io_peer<Codec, Incoming, Outgoing,
out_buffer_.read_segment_length());
}
- return no_error();
+ return void_t{};
}
-template <typename Codec, typename Incoming, typename Outgoing,
- typename InContainer, typename OutContainer, typename BufferT>
+template <typename Incoming, typename Outgoing, typename Encoding,
+ typename BufferT>
conveyor<void>
-streaming_io_peer<Codec, Incoming, Outgoing, InContainer, OutContainer,
+streaming_io_peer<Incoming, Outgoing, Encoding,
BufferT>::on_read_disconnected() {
return io_stream_->on_read_disconnected();
}
-template <typename Codec, typename Incoming, typename Outgoing,
- typename InContainer, typename OutContainer, typename BufferT>
-std::pair<own<streaming_io_peer<Codec, Incoming, Outgoing, InContainer,
- OutContainer, BufferT>>,
- conveyor<heap_message_root<Incoming, InContainer>>>
-newstreaming_io_peer(own<async_io_stream> stream) {
+template <typename Codec, typename Incoming, typename Outgoing, typename BufferT>
+std::pair<own<streaming_io_peer<Codec, Incoming, Outgoing, BufferT>>,
+ conveyor<data<Incoming>>>
+new_streaming_io_peer(own<async_io_stream> stream) {
auto caf =
- new_conveyor_and_feeder<heap_message_root<Incoming, InContainer>>();
+ new_conveyor_and_feeder<data<Incoming>>();
- return {heap<streaming_io_peer<Codec, Incoming, Outgoing, InContainer,
- OutContainer, BufferT>>(
+ return {heap<streaming_io_peer<Codec, Incoming, Outgoing, BufferT>>(
std::move(caf.feeder), std::move(stream)),
std::move(caf.conveyor)};
}