summaryrefslogtreecommitdiff
path: root/modules/io_codec/c++/io_peer.tmpl.hpp
blob: 7329eb223de3d144d2c6409e9319a4050cd4ace4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
namespace saw {

template <typename Incoming, typename Outgoing, typename TransportEncoding, typename ContentEncoding,
		  typename BufferT>
streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding,
				  BufferT>::
	streaming_io_peer(
		own<conveyor_feeder<data<Incoming, ContentEncoding>>> feed,
		own<async_io_stream> str)
	: streaming_io_peer{std::move(feed), std::move(str), {}, {}, {}, {}} {}

template <typename Incoming, typename Outgoing, typename TransportEncoding, typename ContentEncoding, typename BufferT>
streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding,
				  BufferT>::
	streaming_io_peer(
		own<conveyor_feeder<data<Incoming, ContentEncoding>>> feed,
		own<async_io_stream> stream, transport<TransportEncoding> in_codec, BufferT in, BufferT out)
	: incoming_feeder_{std::move(feed)},
	  io_stream_{std::move(stream)}, 
		in_codec_{std::move(in_codec)},
	  in_buffer_{std::move(in)}, out_buffer_{std::move(out)},
	  sink_read_{
		  io_stream_->read_done()
			  .then([this](size_t bytes) -> error_or<void> {
				  in_buffer_.write_advance(bytes);

				  if (in_buffer_.write_segment_length() == 0) {
					  return make_error<err::critical>("Message too long");
				  }

				  io_stream_->read(&in_buffer_.write(), 1,
								   in_buffer_.write_segment_length());

				  while (true) {
						auto eov = in_codec_.view_slice(in_buffer_);
						if(eov.is_error()){
							auto& err = eov.get_error();

							if(err.template is_type<err::buffer_exhausted>()){
								break;
							}
							return std::move(err);
						}
						auto& view_val = eov.get_value();
						/**
						 * Allocate buffer for the advertised length
						 */
						own<buffer> in_buff = heap<array_buffer>(view_val.read_composite_length());
						auto eo_len = in_buff->write_from(view_val);
						if(eo_len.is_error()){
							return std::move(eo_len.get_error());
						}
						auto& len_val = eo_len.get_value();
						in_buffer->write_advance(len_val);

						data<Incoming, ContentEncoding> in_data{std::move(array_buffer)};
						incoming_feeder_->feed(std::move(in_data));
				  }

				  return void_t{};
			  })
			  .sink([this](error err) {
				  incoming_feeder_->fail(err.copy_error());

				  return err;
			  })},
	  sink_write_{io_stream_->write_done()
					  .then([this](size_t bytes) -> error_or<void> {
						  out_buffer_.read_advance(bytes);
						  if (out_buffer_.read_composite_length() > 0) {
							  io_stream_->write(
								  &out_buffer_.read(),
								  out_buffer_.read_segment_length());
						  }

						  return void_t{};
					  })
					  .sink()} {
	io_stream_->read(&in_buffer_.write(), 1, in_buffer_.write_segment_length());
}

template <typename Incoming, typename Outgoing, typename TransportEncoding,
		  typename ContentEncoding, typename BufferT>
error_or<void> streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding,
						BufferT>::send(data<Outgoing, ContentEncoding>
										   msg) {
	bool restart_write = out_buffer_.read_segment_length() == 0;

	if (eov.is_error()) {
		return eov;
	}

	if (restart_write) {
		io_stream_->write(&out_buffer_.read(),
						  out_buffer_.read_segment_length());
	}

	return void_t{};
}

template <typename Incoming, typename Outgoing, typename TransportEncoding, typename ContentEncoding,
		  typename BufferT>
conveyor<void>
streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding,
				  BufferT>::on_read_disconnected() {
	return io_stream_->on_read_disconnected();
}

template <typename Incoming, typename Outgoing, typename TransportEncoding, typename ContentEncoding, typename BufferT>
std::pair<own<streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding, BufferT>>,
		  conveyor<data<Incoming,ContentEncoding>>>
new_streaming_io_peer(own<async_io_stream> stream) {
	auto caf =
		new_conveyor_and_feeder<data<Incoming, ContentEncoding>>();

	return {heap<streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding,BufferT>>(
				std::move(caf.feeder), std::move(stream)),
			std::move(caf.conveyor)};
}

} // namespace saw