1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
namespace saw {
template <typename Codec, typename Incoming, typename Outgoing,
typename InContainer, typename OutContainer, typename BufferT>
streaming_io_peer<Codec, Incoming, Outgoing, InContainer, OutContainer,
BufferT>::
streaming_io_peer(
own<conveyor_feeder<heap_message_root<Incoming, InContainer>>> feed,
own<async_io_stream> str)
: streaming_io_peer{std::move(feed), std::move(str), {}, {}, {}} {}
template <typename Codec, typename Incoming, typename Outgoing,
typename InContainer, typename OutContainer, typename BufferT>
streaming_io_peer<Codec, Incoming, Outgoing, InContainer, OutContainer,
BufferT>::
streaming_io_peer(
own<conveyor_feeder<heap_message_root<Incoming, InContainer>>> feed,
own<async_io_stream> stream, Codec codec, BufferT in, BufferT out)
: incoming_feeder_{std::move(feed)},
io_stream_{std::move(stream)}, codec_{std::move(codec)},
in_buffer_{std::move(in)}, out_buffer_{std::move(out)},
sink_read_{
io_stream_->read_done()
.then([this](size_t bytes) -> error_or<void> {
in_buffer_.write_advance(bytes);
if (in_buffer_.write_segment_length() == 0) {
return critical_error("Message too long");
}
io_stream_->read(&in_buffer_.write(), 1,
in_buffer_.write_segment_length());
while (true) {
auto root = heap_message_root<Incoming, InContainer>();
auto builder = root.build();
error err = codec_.template decode<Incoming, InContainer>(
builder, in_buffer_);
if (err.is_critical()) {
return err;
}
if (!err.failed()) {
incoming_feeder_->feed(std::move(root));
} else {
break;
}
}
return void_t{};
})
.sink([this](error err) {
incoming_feeder_->fail(err.copy_error());
return err;
})},
sink_write_{io_stream_->write_done()
.then([this](size_t bytes) -> error_or<void> {
out_buffer_.read_advance(bytes);
if (out_buffer_.readCompositeLength() > 0) {
io_stream_->write(
&out_buffer_.read(),
out_buffer_.read_segment_length());
}
return void_t{};
})
.sink()} {
io_stream_->read(&in_buffer_.write(), 1, in_buffer_.write_segment_length());
}
template <typename Codec, typename Incoming, typename Outgoing,
typename InContainer, typename OutContainer, typename BufferT>
error streaming_io_peer<Codec, Incoming, Outgoing, InContainer, OutContainer,
BufferT>::send(heap_message_root<Outgoing, OutContainer>
msg) {
bool restart_write = out_buffer_.read_segment_length() == 0;
error err =
codec_.template encode<Outgoing, OutContainer>(msg.read(), out_buffer_);
if (err.failed()) {
return err;
}
if (restart_write) {
io_stream_->write(&out_buffer_.read(),
out_buffer_.read_segment_length());
}
return no_error();
}
template <typename Codec, typename Incoming, typename Outgoing,
typename InContainer, typename OutContainer, typename BufferT>
conveyor<void>
streaming_io_peer<Codec, Incoming, Outgoing, InContainer, OutContainer,
BufferT>::on_read_disconnected() {
return io_stream_->on_read_disconnected();
}
template <typename Codec, typename Incoming, typename Outgoing,
typename InContainer, typename OutContainer, typename BufferT>
std::pair<own<streaming_io_peer<Codec, Incoming, Outgoing, InContainer,
OutContainer, BufferT>>,
conveyor<heap_message_root<Incoming, InContainer>>>
newstreaming_io_peer(own<async_io_stream> stream) {
auto caf =
new_conveyor_and_feeder<heap_message_root<Incoming, InContainer>>();
return {heap<streaming_io_peer<Codec, Incoming, Outgoing, InContainer,
OutContainer, BufferT>>(
std::move(caf.feeder), std::move(stream)),
std::move(caf.conveyor)};
}
} // namespace saw
|