summaryrefslogtreecommitdiff
path: root/modules/io_codec/io_peer.hpp
blob: 46a7d11500eaa281050387e6c7b8dbac9e7e0f7b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#pragma once

#include <forstio/async/async.h>
#include <forstio/buffer.h>
#include <forsto/io/io.h>
#include <forstio/schema/message.h>

namespace saw {

template <typename Codec, typename Incoming, typename Outgoing,
		  typename InContainer = message_container<Incoming>,
		  typename OutContainer = message_container<Outgoing>,
		  typename BufferT = ring_buffer>
class streaming_io_peer {
public:
	/**
	 *
	 */
	streaming_io_peer(
		own<conveyor_feeder<heap_message_root<Incoming, InContainer>>> feed,
		own<async_io_stream> stream, Codec codec, BufferT in, BufferT out);
	/**
	 *
	 */
	streaming_io_peer(
		own<conveyor_feeder<heap_message_root<Incoming, InContainer>>> feed,
		own<async_io_stream> stream);

	/**
	 * Deleted copy and move constructors
	 */
	SAW_FORBID_COPY(streaming_io_peer);
	SAW_FORBID_MOVE(streaming_io_peer);

	/**
	 * Send a message to the remote peer
	 */
	error send(heap_message_root<Outgoing, OutContainer> builder);

	/**
	 * A phantom conveyor feeder. Meant for interfacing with other components
	 */
	conveyor_feeder<heap_message_root<Outgoing, OutContainer>> &feeder();

	conveyor<void> on_read_disconnected();

private:
	/// @unimplemented
	class peer_conveyor_feeder final
		: public conveyor_feeder<heap_message_root<Outgoing, OutContainer>> {
	public:
		peer_conveyor_feeder(
			streaming_io_peer<Codec, Incoming, Outgoing, InContainer,
							  OutContainer, BufferT> &peer_)
			: peer_{peer_} {}

		void feed(heap_message_root<Outgoing, OutContainer> &&data) override {
			(void)data;
		}

		void fail(error &&error) override { (void)error; }

		size_t space() const override { return 0; }

		size_t queued() const override { return 0; }

	private:
		streaming_io_peer<Codec, Incoming, Outgoing, InContainer, OutContainer,
						  BufferT> &peer_;
	};

private:
	own<conveyor_feeder<heap_message_root<Incoming, InContainer>>>
		incoming_feeder_ = nullptr;

	own<async_io_stream> io_stream_;

	Codec codec_;

	BufferT in_buffer_;
	BufferT out_buffer_;

	conveyor_sink sink_read_;
	conveyor_sink sink_write_;

	peer_conveyor_feeder conveyor_feeder_;
};

/**
 * Setup new streaming io peer with the provided network protocols.
 * This is a convenience wrapper intended for a faster setup of this class
 */
template <typename Codec, typename Incoming, typename Outgoing,
		  typename InContainer = message_container<Incoming>,
		  typename OutContainer = message_container<Outgoing>,
		  typename BufferT = ring_buffer>
std::pair<own<streaming_io_peer<Codec, Incoming, Outgoing, InContainer,
								OutContainer, BufferT>>,
		  conveyor<heap_message_root<Incoming, InContainer>>>
new_streaming_io_peer(own<async_io_stream> stream);

} // namespace saw

#include "io_peer.tmpl.hpp