namespace saw { template streaming_io_peer:: streaming_io_peer( own>> feed, own str) : streaming_io_peer{std::move(feed), std::move(str), {}, {}, {}, {}} {} template streaming_io_peer:: streaming_io_peer( own>> feed, own stream, codec in_codec, codec out_codec, BufferT in, BufferT out) : incoming_feeder_{std::move(feed)}, io_stream_{std::move(stream)}, in_codec_{std::move(in_codec)}, out_codec_{std::move(out_codec)}, in_buffer_{std::move(in)}, out_buffer_{std::move(out)}, sink_read_{ io_stream_->read_done() .then([this](size_t bytes) -> error_or { in_buffer_.write_advance(bytes); if (in_buffer_.write_segment_length() == 0) { return make_error("Message too long"); } io_stream_->read(&in_buffer_.write(), 1, in_buffer_.write_segment_length()); while (true) { buffer_view in_view{in_buffer_}; auto in_data = data{in_view}; incoming_feeder_->feed(std::move(in_data)); } return void_t{}; }) .sink([this](error err) { incoming_feeder_->fail(err.copy_error()); return err; })}, sink_write_{io_stream_->write_done() .then([this](size_t bytes) -> error_or { out_buffer_.read_advance(bytes); if (out_buffer_.read_composite_length() > 0) { io_stream_->write( &out_buffer_.read(), out_buffer_.read_segment_length()); } return void_t{}; }) .sink()} { io_stream_->read(&in_buffer_.write(), 1, in_buffer_.write_segment_length()); } template error_or streaming_io_peer::send(data msg) { bool restart_write = out_buffer_.read_segment_length() == 0; data enc; auto eov = out_codec_.encode(msg, enc);//msg.read(), out_buffer_); if (eov.is_error()) { return eov; } if (restart_write) { io_stream_->write(&out_buffer_.read(), out_buffer_.read_segment_length()); } return void_t{}; } template conveyor streaming_io_peer::on_read_disconnected() { return io_stream_->on_read_disconnected(); } template std::pair>, conveyor>> new_streaming_io_peer(own stream) { auto caf = new_conveyor_and_feeder>(); return {heap>( std::move(caf.feeder), std::move(stream)), std::move(caf.conveyor)}; } } // namespace saw