summaryrefslogtreecommitdiff
path: root/modules/io_codec
diff options
context:
space:
mode:
authorClaudius 'keldu' Holeksa <mail@keldu.de>2024-08-03 18:06:50 +0200
committerClaudius 'keldu' Holeksa <mail@keldu.de>2024-08-03 18:06:50 +0200
commite42c3750013e634bbe1834e3e684367e42814b97 (patch)
treeaa398cb6370b02dd3c68a73771f964065f12219d /modules/io_codec
parent738bc442f680bda95667e4fd1ae743c6f6afeab0 (diff)
wip
Diffstat (limited to 'modules/io_codec')
-rw-r--r--modules/io_codec/c++/io_peer.hpp5
-rw-r--r--modules/io_codec/c++/io_peer.tmpl.hpp32
2 files changed, 25 insertions, 12 deletions
diff --git a/modules/io_codec/c++/io_peer.hpp b/modules/io_codec/c++/io_peer.hpp
index 9381754..c4e28a1 100644
--- a/modules/io_codec/c++/io_peer.hpp
+++ b/modules/io_codec/c++/io_peer.hpp
@@ -19,7 +19,7 @@ public:
*/
streaming_io_peer(
own<conveyor_feeder<data<Incoming, ContentEncoding>>> feed,
- own<async_io_stream> stream, codec<Incoming, TransportEncoding> in_codec, codec<Outgoing, TransportEncoding> out_codec, BufferT in, BufferT out);
+ own<async_io_stream> stream, transport<TransportEncoding> in_codec, BufferT in, BufferT out);
/**
* Constructor with mostly default assignements
@@ -77,8 +77,7 @@ private:
own<async_io_stream> io_stream_;
- codec<Incoming, TransportEncoding> in_codec_;
- codec<Outgoing, TransportEncoding> out_codec_;
+ transport<TransportEncoding> in_codec_;
BufferT in_buffer_;
BufferT out_buffer_;
diff --git a/modules/io_codec/c++/io_peer.tmpl.hpp b/modules/io_codec/c++/io_peer.tmpl.hpp
index b7ccb49..7329eb2 100644
--- a/modules/io_codec/c++/io_peer.tmpl.hpp
+++ b/modules/io_codec/c++/io_peer.tmpl.hpp
@@ -14,11 +14,10 @@ streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding,
BufferT>::
streaming_io_peer(
own<conveyor_feeder<data<Incoming, ContentEncoding>>> feed,
- own<async_io_stream> stream, codec<Incoming, TransportEncoding> in_codec, codec<Outgoing, TransportEncoding> out_codec, BufferT in, BufferT out)
+ own<async_io_stream> stream, transport<TransportEncoding> in_codec, BufferT in, BufferT out)
: incoming_feeder_{std::move(feed)},
io_stream_{std::move(stream)},
in_codec_{std::move(in_codec)},
- out_codec_{std::move(out_codec)},
in_buffer_{std::move(in)}, out_buffer_{std::move(out)},
sink_read_{
io_stream_->read_done()
@@ -33,9 +32,28 @@ streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding,
in_buffer_.write_segment_length());
while (true) {
- buffer_view in_view{in_buffer_};
- auto in_data = data<Incoming, TransportEncoding>{in_view};
-
+ auto eov = in_codec_.view_slice(in_buffer_);
+ if(eov.is_error()){
+ auto& err = eov.get_error();
+
+ if(err.template is_type<err::buffer_exhausted>()){
+ break;
+ }
+ return std::move(err);
+ }
+ auto& view_val = eov.get_value();
+ /**
+ * Allocate buffer for the advertised length
+ */
+ own<buffer> in_buff = heap<array_buffer>(view_val.read_composite_length());
+ auto eo_len = in_buff->write_from(view_val);
+ if(eo_len.is_error()){
+ return std::move(eo_len.get_error());
+ }
+ auto& len_val = eo_len.get_value();
+ in_buffer->write_advance(len_val);
+
+ data<Incoming, ContentEncoding> in_data{std::move(array_buffer)};
incoming_feeder_->feed(std::move(in_data));
}
@@ -68,10 +86,6 @@ error_or<void> streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentE
msg) {
bool restart_write = out_buffer_.read_segment_length() == 0;
- data<Outgoing, TransportEncoding> enc;
-
- auto eov =
- out_codec_.encode(msg, enc);//msg.read(), out_buffer_);
if (eov.is_error()) {
return eov;
}