114 lines
3.8 KiB
C++
114 lines
3.8 KiB
C++
namespace saw {
|
|
|
|
template <typename Codec, typename Incoming, typename Outgoing,
|
|
typename InContainer, typename OutContainer, typename BufferT>
|
|
streaming_io_peer<Codec, Incoming, Outgoing, InContainer, OutContainer, BufferT>::
|
|
streaming_io_peer(
|
|
own<conveyor_feeder<HeapMessageRoot<Incoming, InContainer>>> feed,
|
|
own<async_io_stream> str)
|
|
: streaming_io_peer{std::move(feed), std::move(str), {}, {}, {}} {}
|
|
|
|
template <typename Codec, typename Incoming, typename Outgoing,
|
|
typename InContainer, typename OutContainer, typename BufferT>
|
|
streaming_io_peer<Codec, Incoming, Outgoing, InContainer, OutContainer, BufferT>::
|
|
streaming_io_peer(
|
|
own<conveyor_feeder<HeapMessageRoot<Incoming, InContainer>>> feed,
|
|
own<async_io_stream> stream, Codec codec, BufferT in, BufferT out)
|
|
: incoming_feeder_{std::move(feed)},
|
|
io_stream_{std::move(stream)}, codec_{std::move(codec)},
|
|
in_buffer_{std::move(in)}, out_buffer_{std::move(out)},
|
|
sink_read_{io_stream_->read_done()
|
|
.then([this](size_t bytes) -> error_or<void> {
|
|
in_buffer_.write_advance(bytes);
|
|
|
|
if (in_buffer_.write_segment_length() == 0) {
|
|
return critical_error("Message too long");
|
|
}
|
|
|
|
io_stream_->read(&in_buffer_.write(), 1,
|
|
in_buffer_.write_segment_length());
|
|
|
|
while (true) {
|
|
auto root =
|
|
heapMessageRoot<Incoming, InContainer>();
|
|
auto builder = root.build();
|
|
|
|
error err =
|
|
codec_.template decode<Incoming, InContainer>(
|
|
builder, in_buffer_);
|
|
if (err.is_critical()) {
|
|
return err;
|
|
}
|
|
|
|
if (!err.failed()) {
|
|
incoming_feeder_->feed(std::move(root));
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
|
|
return void_t{};
|
|
})
|
|
.sink([this](error err) {
|
|
incoming_feeder_->fail(err.copy_error());
|
|
|
|
return err;
|
|
})},
|
|
sink_write_{io_stream_->write_done()
|
|
.then([this](size_t bytes) -> error_or<void> {
|
|
out_buffer_.read_advance(bytes);
|
|
if (out_buffer_.readCompositeLength() > 0) {
|
|
io_stream_->write(
|
|
&out_buffer_.read(),
|
|
out_buffer_.read_segment_length());
|
|
}
|
|
|
|
return void_t{};
|
|
})
|
|
.sink()} {
|
|
io_stream_->read(&in_buffer_.write(), 1, in_buffer_.write_segment_length());
|
|
}
|
|
|
|
template <typename Codec, typename Incoming, typename Outgoing,
|
|
typename InContainer, typename OutContainer, typename BufferT>
|
|
error streaming_io_peer<Codec, Incoming, Outgoing, InContainer, OutContainer,
|
|
BufferT>::send(HeapMessageRoot<Outgoing, OutContainer>
|
|
msg) {
|
|
bool restart_write = out_buffer_.read_segment_length() == 0;
|
|
|
|
error err =
|
|
codec_.template encode<Outgoing, OutContainer>(msg.read(), out_buffer_);
|
|
if (err.failed()) {
|
|
return err;
|
|
}
|
|
|
|
if (restart_write) {
|
|
io_stream_->write(&out_buffer_.read(), out_buffer_.read_segment_length());
|
|
}
|
|
|
|
return no_error();
|
|
}
|
|
|
|
template <typename Codec, typename Incoming, typename Outgoing,
|
|
typename InContainer, typename OutContainer, typename BufferT>
|
|
conveyor<void> streaming_io_peer<Codec, Incoming, Outgoing, InContainer,
|
|
OutContainer, BufferT>::on_read_disconnected() {
|
|
return io_stream_->on_read_disconnected();
|
|
}
|
|
|
|
template <typename Codec, typename Incoming, typename Outgoing,
|
|
typename InContainer, typename OutContainer, typename BufferT>
|
|
std::pair<own<streaming_io_peer<Codec, Incoming, Outgoing, InContainer,
|
|
OutContainer, BufferT>>,
|
|
conveyor<HeapMessageRoot<Incoming, InContainer>>>
|
|
newstreaming_io_peer(own<async_io_stream> stream) {
|
|
auto caf = newconveyorAndFeeder<HeapMessageRoot<Incoming, InContainer>>();
|
|
|
|
return {heap<streaming_io_peer<Codec, Incoming, Outgoing, InContainer,
|
|
OutContainer, BufferT>>(std::move(caf.feeder),
|
|
std::move(stream)),
|
|
std::move(caf.conveyor)};
|
|
}
|
|
|
|
} // namespace saw
|