namespace saw { } namespace saw { template streaming_io_peer:: streaming_io_peer( own>> feed, own> str) : streaming_io_peer{std::move(feed), std::move(str), {}} {} template streaming_io_peer:: streaming_io_peer( own>> feed, own> stream, transport in_codec) : incoming_feeder_{std::move(feed)}, io_stream_{std::move(stream)}, in_codec_{std::move(in_codec)}, in_buffer_{}, out_buffer_{}, sink_read_{ io_stream_->read_done() .then([this](size_t bytes) -> error_or { in_buffer_.write_advance(bytes); if (in_buffer_.write_segment_length() == 0) { return make_error("Message too long"); } io_stream_->read(&in_buffer_.write(), 1, in_buffer_.write_segment_length()); while (true) { auto eov = in_codec_.view_slice(in_buffer_); if(eov.is_error()){ auto& err = eov.get_error(); if(err.template is_type()){ break; } return std::move(err); } auto& view_val = eov.get_value(); /** * Allocate buffer for the advertised length */ own in_buff = heap(view_val.read_composite_length()); auto eo_len = in_buff->write_from(view_val); if(eo_len.is_error()){ return std::move(eo_len.get_error()); } auto& len_val = eo_len.get_value(); in_buffer_.read_advance(len_val); in_buff->write_advance(len_val); data in_data{std::move(in_buff)}; incoming_feeder_->feed(std::move(in_data)); } return void_t{}; }) .sink([this](error err) { incoming_feeder_->fail(err.copy_error()); return err; })}, sink_write_{io_stream_->write_done() .then([this](size_t bytes) -> error_or { out_buffer_.read_advance(bytes); if (out_buffer_.read_composite_length() > 0) { io_stream_->write( &out_buffer_.read(), out_buffer_.read_segment_length()); } return void_t{}; },[this](error err){ if(disconnect_feeder_){ disconnect_feeder_->feed(); } return err; }) .sink() }, conveyor_feeder_{ *this } { io_stream_->read(&in_buffer_.write(), 1, in_buffer_.write_segment_length()); } template error_or streaming_io_peer::send(data msg) { bool restart_write = (out_buffer_.read_segment_length() == 0u); auto& msg_buff = msg.get_buffer(); { auto eov = in_codec_.wrap(out_buffer_, msg_buff); if(eov.is_error()){ auto& err = eov.get_error(); return std::move(err); } // auto& len_val = eov.get_value(); } auto eov = out_buffer_.write_from(msg_buff); if (eov.is_error()) { auto& err = eov.get_error(); return std::move(err); } auto& len_val = eov.get_value(); out_buffer_.write_advance(len_val); if (restart_write && out_buffer_.read_segment_length() > 0u) { io_stream_->write(&out_buffer_.read(), out_buffer_.read_segment_length()); } return void_t{}; } template conveyor streaming_io_peer::on_disconnected() { io_read_disconnected_ = io_stream_->on_read_disconnected().then([this](){ if(disconnect_feeder_){ disconnect_feeder_->feed(); } }).sink(); auto caf = new_conveyor_and_feeder(); disconnect_feeder_ = std::move(caf.feeder); return std::move(caf.conveyor); } template std::pair>, conveyor>> new_streaming_io_peer(own> stream) { auto caf = new_conveyor_and_feeder>(); return {heap>( std::move(caf.feeder), std::move(stream)), std::move(caf.conveyor)}; } } // namespace saw