diff options
author | Claudius 'keldu' Holeksa <mail@keldu.de> | 2024-10-16 18:51:50 +0200 |
---|---|---|
committer | Claudius 'keldu' Holeksa <mail@keldu.de> | 2024-10-16 18:51:50 +0200 |
commit | b048b02732cbfcfbb95bb8e16dec71aca0e977f4 (patch) | |
tree | 8f948159473f40ca42c73d75e61d8d03c3b38f24 | |
parent | b10d0b0e1a30eb02777f5a02b81bf45e09749edb (diff) |
Reworked abstraction for network
-rw-r--r-- | modules/io-tls/tls.hpp | 12 | ||||
-rw-r--r-- | modules/io/c++/io.cpp | 53 | ||||
-rw-r--r-- | modules/io/c++/io.hpp | 90 | ||||
-rw-r--r-- | modules/io/c++/io_unix.cpp | 58 | ||||
-rw-r--r-- | modules/io/examples/echo_client.cpp | 6 | ||||
-rw-r--r-- | modules/io/examples/echo_server.cpp | 10 | ||||
-rw-r--r-- | modules/io_codec/c++/io_peer.hpp | 18 | ||||
-rw-r--r-- | modules/io_codec/c++/io_peer.tmpl.hpp | 29 | ||||
-rw-r--r-- | modules/io_codec/examples/peer_echo_client.cpp | 6 | ||||
-rw-r--r-- | modules/io_codec/examples/peer_echo_server.cpp | 6 |
10 files changed, 157 insertions, 131 deletions
diff --git a/modules/io-tls/tls.hpp b/modules/io-tls/tls.hpp index 93ec180..e2202f4 100644 --- a/modules/io-tls/tls.hpp +++ b/modules/io-tls/tls.hpp @@ -7,6 +7,11 @@ #include <variant> namespace saw { +namespace net { +template<typename T = net::Os> +struct Tls {}; +} + class tls; /** @@ -37,6 +42,11 @@ private: options options_; }; -error_or<own<network>> setup_tls_network(network &network); +template<> +class network<net::Tls> { +}; + +template<typename T = net::Os> +error_or<own<network<net::Tls<T>>>> setup_tls_network(network<T> &network); } // namespace saw diff --git a/modules/io/c++/io.cpp b/modules/io/c++/io.cpp index 06c9cbb..50423e1 100644 --- a/modules/io/c++/io.cpp +++ b/modules/io/c++/io.cpp @@ -4,59 +4,6 @@ namespace saw { -async_io_stream::async_io_stream(own<io_stream> str) - : stream_{std::move(str)}, - read_ready_{stream_->read_ready() - .then([this]() { read_stepper_.read_step(*stream_); }) - .sink()}, - write_ready_{stream_->write_ready() - .then([this]() { write_stepper_.write_step(*stream_); }) - .sink()}, - read_disconnected_{stream_->on_read_disconnected() - .then([this]() { - if (read_stepper_.on_read_disconnect) { - read_stepper_.on_read_disconnect->feed(); - } - }) - .sink()} {} - -void async_io_stream::read(void *buffer, size_t min_length, size_t max_length) { - SAW_ASSERT(buffer && max_length >= min_length && min_length > 0) { return; } - - SAW_ASSERT(!read_stepper_.read_task.has_value()) { return; } - - read_stepper_.read_task = read_task_and_step_helper::read_io_task{ - buffer, min_length, max_length, 0}; - read_stepper_.read_step(*stream_); -} - -conveyor<size_t> async_io_stream::read_done() { - auto caf = new_conveyor_and_feeder<size_t>(); - read_stepper_.read_done = std::move(caf.feeder); - return std::move(caf.conveyor); -} - -conveyor<void> async_io_stream::on_read_disconnected() { - auto caf = new_conveyor_and_feeder<void>(); - read_stepper_.on_read_disconnect = std::move(caf.feeder); - return std::move(caf.conveyor); -} - -void async_io_stream::write(const void *buffer, size_t length) { - SAW_ASSERT(buffer && length > 0) { return; } - - SAW_ASSERT(!write_stepper_.write_task.has_value()) { return; } - - write_stepper_.write_task = - write_task_and_step_helper::write_io_task{buffer, length, 0}; - write_stepper_.write_step(*stream_); -} - -conveyor<size_t> async_io_stream::write_done() { - auto caf = new_conveyor_and_feeder<size_t>(); - write_stepper_.write_done = std::move(caf.feeder); - return std::move(caf.conveyor); -} string_network_address::string_network_address(const std::string &address, uint16_t port) diff --git a/modules/io/c++/io.hpp b/modules/io/c++/io.hpp index 390ddcc..fccba7f 100644 --- a/modules/io/c++/io.hpp +++ b/modules/io/c++/io.hpp @@ -55,6 +55,7 @@ public: /* * Io stream */ +template<typename T = net::Os> class io_stream : public input_stream, public output_stream { public: virtual ~io_stream() = default; @@ -79,10 +80,11 @@ public: virtual conveyor<size_t> write_done() = 0; }; +template<typename T = net::Os> class async_io_stream final : public async_input_stream, public async_output_stream { private: - own<io_stream> stream_; + own<io_stream<T>> stream_; conveyor_sink read_ready_; conveyor_sink write_ready_; @@ -92,7 +94,7 @@ private: write_task_and_step_helper write_stepper_; public: - async_io_stream(own<io_stream> str); + async_io_stream(own<io_stream<T>> str); SAW_FORBID_COPY(async_io_stream); SAW_FORBID_MOVE(async_io_stream); @@ -108,19 +110,22 @@ public: conveyor<size_t> write_done() override; }; +template<typename T = net::Os> class server { public: virtual ~server() = default; - virtual conveyor<own<io_stream>> accept() = 0; + virtual conveyor<own<io_stream<T>>> accept() = 0; }; +template<typename T = net::Os> class network_address; /** * Datagram class. Bound to a local address it is able to receive inbound * datagram messages and send them as well as long as an address is provided as * well */ +template<typename T = net::Os> class datagram { public: virtual ~datagram() = default; @@ -129,13 +134,14 @@ public: virtual conveyor<void> read_ready() = 0; virtual error_or<size_t> write(const void *buffer, size_t length, - network_address &dest) = 0; + network_address<T> &dest) = 0; virtual conveyor<void> write_ready() = 0; }; class os_network_address; class string_network_address; +template<typename T> class network_address { public: using child_variant = @@ -149,14 +155,14 @@ public: virtual uint16_t port() const = 0; }; -class os_network_address : public network_address { +class os_network_address : public network_address<net::Os> { public: virtual ~os_network_address() = default; network_address::child_variant representation() override { return this; } }; -class string_network_address final : public network_address { +class string_network_address final : public network_address<net::Os> { private: std::string address_value_; uint16_t port_value_; @@ -178,10 +184,10 @@ public: /** * Resolve the provided string and uint16 to the preferred storage method */ - virtual conveyor<own<network_address>> + virtual conveyor<own<network_address<T>>> resolve_address(const std::string &addr, uint16_t port_hint = 0) = 0; - virtual error_or<own<network_address>> + virtual error_or<own<network_address<T>>> parse_address(const std::string &addr, uint16_t port_hint = 0) = 0; /** @@ -195,17 +201,17 @@ public: /** * Set up a listener on this address */ - virtual own<server> listen(network_address &bind_addr) = 0; + virtual own<server<T>> listen(network_address<T> &bind_addr) = 0; /** * Connect to a remote address */ - virtual conveyor<own<io_stream>> connect(network_address &address) = 0; + virtual conveyor<own<io_stream<T>>> connect(network_address<T> &address) = 0; /** * Bind a datagram socket at this address. */ - virtual own<datagram> datagram(network_address &address) = 0; + virtual own<datagram<T>> bind_datagram(network_address<T> &address) = 0; }; class io_provider { @@ -222,6 +228,68 @@ struct async_io_context { event_loop &event_loop; event_port &event_port; }; +} + +namespace saw { +template<typename T> +async_io_stream<T>::async_io_stream(own<io_stream<T>> str) + : stream_{std::move(str)}, + read_ready_{stream_->read_ready() + .then([this]() { read_stepper_.read_step(*stream_); }) + .sink()}, + write_ready_{stream_->write_ready() + .then([this]() { write_stepper_.write_step(*stream_); }) + .sink()}, + read_disconnected_{stream_->on_read_disconnected() + .then([this]() { + if (read_stepper_.on_read_disconnect) { + read_stepper_.on_read_disconnect->feed(); + } + }) + .sink()} {} + +template<typename T> +void async_io_stream<T>::read(void *buffer, size_t min_length, size_t max_length) { + SAW_ASSERT(buffer && max_length >= min_length && min_length > 0) { return; } + + SAW_ASSERT(!read_stepper_.read_task.has_value()) { return; } + + read_stepper_.read_task = read_task_and_step_helper::read_io_task{ + buffer, min_length, max_length, 0}; + read_stepper_.read_step(*stream_); +} + +template<typename T> +conveyor<size_t> async_io_stream<T>::read_done() { + auto caf = new_conveyor_and_feeder<size_t>(); + read_stepper_.read_done = std::move(caf.feeder); + return std::move(caf.conveyor); +} + +template<typename T> +conveyor<void> async_io_stream<T>::on_read_disconnected() { + auto caf = new_conveyor_and_feeder<void>(); + read_stepper_.on_read_disconnect = std::move(caf.feeder); + return std::move(caf.conveyor); +} + +template<typename T> +void async_io_stream<T>::write(const void *buffer, size_t length) { + SAW_ASSERT(buffer && length > 0) { return; } + + SAW_ASSERT(!write_stepper_.write_task.has_value()) { return; } + + write_stepper_.write_task = + write_task_and_step_helper::write_io_task{buffer, length, 0}; + write_stepper_.write_step(*stream_); +} + +template<typename T> +conveyor<size_t> async_io_stream<T>::write_done() { + auto caf = new_conveyor_and_feeder<size_t>(); + write_stepper_.write_done = std::move(caf.feeder); + return std::move(caf.conveyor); +} error_or<async_io_context> setup_async_io(); } // namespace saw diff --git a/modules/io/c++/io_unix.cpp b/modules/io/c++/io_unix.cpp index 73e8e3b..1d89e7d 100644 --- a/modules/io/c++/io_unix.cpp +++ b/modules/io/c++/io_unix.cpp @@ -264,7 +264,7 @@ public: ssize_t unix_read(int fd, void *buffer, size_t length); ssize_t unix_write(int fd, const void *buffer, size_t length); -class unix_io_stream final : public io_stream, public i_fd_owner { +class unix_io_stream final : public io_stream<net::Os>, public i_fd_owner { private: own<conveyor_feeder<void>> read_ready_ = nullptr; own<conveyor_feeder<void>> on_read_disconnect_ = nullptr; @@ -299,19 +299,19 @@ public: void notify(uint32_t mask) override; }; -class unix_server final : public server, public i_fd_owner { +class unix_server final : public server<net::Os>, public i_fd_owner { private: - own<conveyor_feeder<own<io_stream>>> accept_feeder_ = nullptr; + own<conveyor_feeder<own<io_stream<net::Os>>>> accept_feeder_ = nullptr; public: unix_server(unix_event_port &event_port, int file_descriptor, int fd_flags); - conveyor<own<io_stream>> accept() override; + conveyor<own<io_stream<net::Os>>> accept() override; void notify(uint32_t mask) override; }; -class unix_datagram final : public datagram, public i_fd_owner { +class unix_datagram final : public datagram<net::Os>, public i_fd_owner { private: own<conveyor_feeder<void>> read_ready_ = nullptr; own<conveyor_feeder<void>> write_ready_ = nullptr; @@ -324,7 +324,7 @@ public: conveyor<void> read_ready() override; error_or<size_t> write(const void *buffer, size_t length, - network_address &dest) override; + network_address<net::Os> &dest) override; conveyor<void> write_ready() override; void notify(uint32_t mask) override; @@ -437,19 +437,19 @@ private: public: unix_network(unix_event_port &event_port); - conveyor<own<network_address>> + conveyor<own<network_address<net::Os>>> resolve_address(const std::string &address, uint16_t port_hint = 0) override; - error_or<own<network_address>> + error_or<own<network_address<net::Os>>> parse_address(const std::string& address, uint16_t port_hint = 0) override; - own<server> listen(network_address &addr) override; + own<server<net::Os>> listen(network_address<net::Os> &addr) override; - conveyor<own<io_stream>> connect(network_address &addr) override; + conveyor<own<io_stream<net::Os>>> connect(network_address<net::Os> &addr) override; - own<class datagram> datagram(network_address &addr) override; + own<datagram<net::Os>> bind_datagram(network_address<net::Os> &addr) override; }; class unix_io_provider final : public io_provider { @@ -462,7 +462,7 @@ private: public: unix_io_provider(unix_event_port &port_ref, own<event_port> port); - class network<net::Os> &get_network() override; + network<net::Os> &get_network() override; own<input_stream> wrap_input_fd(int fd) override; @@ -564,8 +564,8 @@ unix_server::unix_server(unix_event_port &event_port, int file_descriptor, int fd_flags) : i_fd_owner{event_port, file_descriptor, fd_flags, EPOLLIN} {} -conveyor<own<io_stream>> unix_server::accept() { - auto caf = new_conveyor_and_feeder<own<io_stream>>(); +conveyor<own<io_stream<net::Os>>> unix_server::accept() { + auto caf = new_conveyor_and_feeder<own<io_stream<net::Os>>>(); accept_feeder_ = std::move(caf.feeder); return std::move(caf.conveyor); } @@ -624,7 +624,7 @@ conveyor<void> unix_datagram::read_ready() { } error_or<size_t> unix_datagram::write(const void *buffer, size_t length, - network_address &dest) { + network_address<net::Os> &dest) { unix_network_address &unix_dest = static_cast<unix_network_address &>(dest); socket_address &sock_addr = unix_dest.unix_address(); socklen_t sock_addr_length = sock_addr.get_raw_length(); @@ -664,7 +664,7 @@ bool begins_with(const std::string_view &viewed, } std::variant<unix_network_address, unix_network_address *> -translate_network_address_to_unix_network_address(network_address &addr) { +translate_network_address_to_unix_network_address(network_address<net::Os> &addr) { auto addr_variant = addr.representation(); std::variant<unix_network_address, unix_network_address *> os_addr = std::visit( @@ -705,7 +705,7 @@ unix_network_address &translate_to_unix_address_ref( } // namespace -own<server> unix_network::listen(network_address &addr) { +own<server<net::Os>> unix_network::listen(network_address<net::Os> &addr) { auto unix_addr_storage = translate_network_address_to_unix_network_address(addr); unix_network_address &address = @@ -743,7 +743,7 @@ own<server> unix_network::listen(network_address &addr) { #include <iostream> namespace saw { namespace unix { -conveyor<own<io_stream>> unix_network::connect(network_address &addr) { +conveyor<own<io_stream<net::Os>>> unix_network::connect(network_address<net::Os> &addr) { auto unix_addr_storage = translate_network_address_to_unix_network_address(addr); unix_network_address &address = @@ -751,12 +751,12 @@ conveyor<own<io_stream>> unix_network::connect(network_address &addr) { assert(address.unix_address_size() > 0); if (address.unix_address_size() == 0) { - return conveyor<own<io_stream>>{make_error<err::critical>()}; + return conveyor<own<io_stream<net::Os>>>{make_error<err::critical>()}; } int fd = address.unix_address(0).socket(SOCK_STREAM); if (fd < 0) { - return conveyor<own<io_stream>>{make_error<err::disconnected>()}; + return conveyor<own<io_stream<net::Os>>>{make_error<err::disconnected>()}; } own<unix_io_stream> io_str = @@ -780,12 +780,12 @@ conveyor<own<io_stream>> unix_network::connect(network_address &addr) { if (error == EINPROGRESS) { conveyor<void> write_rdy = io_str->write_ready(); - return write_rdy.then([ios = std::move(io_str)] () mutable -> error_or<own<io_stream>> { + return write_rdy.then([ios = std::move(io_str)] () mutable -> error_or<own<io_stream<net::Os>>> { if(!ios){ return make_error<err::invalid_state>("Limit node invalidated"); } - own<io_stream> mov_ios = std::move(ios); + own<io_stream<net::Os>> mov_ios = std::move(ios); /** * This guarantees the old async pipe to not be used anymore */ @@ -795,7 +795,7 @@ conveyor<own<io_stream>> unix_network::connect(network_address &addr) { }); } else if (error != EINTR) { /// @todo Push error message from - return conveyor<own<io_stream>>{make_error<err::disconnected>()}; + return conveyor<own<io_stream<net::Os>>>{make_error<err::disconnected>()}; } } else { success = true; @@ -804,13 +804,13 @@ conveyor<own<io_stream>> unix_network::connect(network_address &addr) { } if (!success) { - return conveyor<own<io_stream>>{make_error<err::disconnected>()}; + return conveyor<own<io_stream<net::Os>>>{make_error<err::disconnected>()}; } - return conveyor<own<io_stream>>{std::move(io_str)}; + return conveyor<own<io_stream<net::Os>>>{std::move(io_str)}; } -own<datagram> unix_network::datagram(network_address &addr) { +own<datagram<net::Os>> unix_network::bind_datagram(network_address<net::Os> &addr) { auto unix_addr_storage = translate_network_address_to_unix_network_address(addr); unix_network_address &address = @@ -853,7 +853,7 @@ size_t unix_network_address::unix_address_size() const { unix_network::unix_network(unix_event_port &event) : event_port_{event} {} -conveyor<own<network_address>> +conveyor<own<network_address<net::Os>>> unix_network::resolve_address(const std::string &path, uint16_t port_hint) { std::string_view addr_view{path}; { @@ -866,11 +866,11 @@ unix_network::resolve_address(const std::string &path, uint16_t port_hint) { std::vector<socket_address> addresses = socket_address::resolve(addr_view, port_hint); - return conveyor<own<network_address>>{ + return conveyor<own<network_address<net::Os>>>{ heap<unix_network_address>(path, port_hint, std::move(addresses))}; } -error_or<own<network_address>> +error_or<own<network_address<net::Os>>> unix_network::parse_address(const std::string& path, uint16_t port_hint){ std::string_view addr_view{path}; { diff --git a/modules/io/examples/echo_client.cpp b/modules/io/examples/echo_client.cpp index e827ce0..69d91a2 100644 --- a/modules/io/examples/echo_client.cpp +++ b/modules/io/examples/echo_client.cpp @@ -30,8 +30,8 @@ int main(){ keep_running = false; }).detach(); - saw::own<saw::network_address> net_addr = nullptr; - saw::own<saw::async_io_stream> async_rmt = nullptr; + saw::own<saw::network_address<saw::net::Os>> net_addr = nullptr; + saw::own<saw::async_io_stream<saw::net::Os>> async_rmt = nullptr; std::array<uint8_t, 32> read_data; uint64_t read_bytes = 0; @@ -39,7 +39,7 @@ int main(){ network.resolve_address(saw::echo_address, saw::echo_port).then([&](auto addr){ net_addr = std::move(addr); network.connect(*net_addr).then([&](auto rmt_srv){ - async_rmt = saw::heap<saw::async_io_stream>(std::move(rmt_srv)); + async_rmt = saw::heap<saw::async_io_stream<saw::net::Os>>(std::move(rmt_srv)); async_rmt->write(&message_content[0], message_content.size()); async_rmt->read(&read_data[0], message_content.size(), read_data.size()-1); diff --git a/modules/io/examples/echo_server.cpp b/modules/io/examples/echo_server.cpp index 4336048..0b0ea1b 100644 --- a/modules/io/examples/echo_server.cpp +++ b/modules/io/examples/echo_server.cpp @@ -4,7 +4,7 @@ #include "echo.hpp" -saw::error_or<void> handle_echo_write(saw::io_stream& rmt_clt, saw::message& state, uint64_t tbw){ +saw::error_or<void> handle_echo_write(saw::io_stream<saw::net::Os>& rmt_clt, saw::message& state, uint64_t tbw){ auto eov = rmt_clt.write(&state.data[state.already_written], tbw); if(eov.is_error()){ return std::move(eov.get_error()); @@ -19,7 +19,7 @@ saw::error_or<void> handle_echo_write(saw::io_stream& rmt_clt, saw::message& sta return saw::void_t{}; } -void handle_echo_message(saw::io_stream& rmt_clt, bool& keep_running, saw::message& state){ +void handle_echo_message(saw::io_stream<saw::net::Os>& rmt_clt, bool& keep_running, saw::message& state){ rmt_clt.read_ready().then([&](){ for(;;){ uint64_t tbr = state.data.size() < state.already_read ? 0: state.data.size() - state.already_read; @@ -113,9 +113,9 @@ int main(){ }).detach(); auto& network = aio.io->get_network(); - saw::own<saw::network_address> addr = nullptr; - saw::own<saw::server> srv = nullptr; - saw::own<saw::io_stream> remote_client = nullptr; + saw::own<saw::network_address<saw::net::Os>> addr = nullptr; + saw::own<saw::server<saw::net::Os>> srv = nullptr; + saw::own<saw::io_stream<saw::net::Os>> remote_client = nullptr; saw::message msg_state; diff --git a/modules/io_codec/c++/io_peer.hpp b/modules/io_codec/c++/io_peer.hpp index 0cb3959..d29c7b8 100644 --- a/modules/io_codec/c++/io_peer.hpp +++ b/modules/io_codec/c++/io_peer.hpp @@ -10,6 +10,7 @@ namespace saw { template <typename Incoming, typename Outgoing, typename TransportEncoding, typename ContentEncoding, + typename Net = net::Os, typename BufferT = ring_buffer> class streaming_io_peer { private: @@ -20,14 +21,14 @@ public: */ streaming_io_peer( own<conveyor_feeder<data<Incoming, ContentEncoding>>> feed, - own<async_io_stream> stream, transport<TransportEncoding> in_codec); + own<async_io_stream<Net>> stream, transport<TransportEncoding> in_codec); /** * Constructor with mostly default assignements */ streaming_io_peer( own<conveyor_feeder<data<Incoming, ContentEncoding>>> feed, - own<async_io_stream> stream); + own<async_io_stream<Net>> stream); /** * Deleted copy and move constructors @@ -54,8 +55,8 @@ private: : public conveyor_feeder<data<Outgoing, ContentEncoding>> { public: peer_conveyor_feeder( - streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding, BufferT> &peer_) - : peer_{peer_} {} + streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding, Net, BufferT> &peer__) + : peer_{peer__} {} void feed(data<Outgoing, ContentEncoding> &&data_) override { (void)data_; @@ -69,7 +70,7 @@ private: error_or<void> swap(conveyor<data<Outgoing, ContentEncoding>> &&) noexcept override { return make_error<err::not_implemented>();} private: - streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding, + streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding, Net, BufferT> &peer_; }; @@ -77,7 +78,7 @@ private: own<conveyor_feeder<data<Incoming, ContentEncoding>>> incoming_feeder_ = nullptr; - own<async_io_stream> io_stream_; + own<async_io_stream<Net>> io_stream_; transport<TransportEncoding> in_codec_; @@ -99,10 +100,11 @@ private: */ template <typename Incoming, typename Outgoing, typename TransportEncoding, typename ContentEncoding, + typename Net = net::Os, typename BufferT = ring_buffer> -std::pair<own<streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding, BufferT>>, +std::pair<own<streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding, Net, BufferT>>, conveyor<data<Incoming, ContentEncoding>>> -new_streaming_io_peer(own<async_io_stream> stream); +new_streaming_io_peer(own<async_io_stream<Net>> stream); } // namespace saw diff --git a/modules/io_codec/c++/io_peer.tmpl.hpp b/modules/io_codec/c++/io_peer.tmpl.hpp index 0322631..e046f92 100644 --- a/modules/io_codec/c++/io_peer.tmpl.hpp +++ b/modules/io_codec/c++/io_peer.tmpl.hpp @@ -1,25 +1,24 @@ namespace saw { } -#include <iostream> namespace saw { -template <typename Incoming, typename Outgoing, typename TransportEncoding, typename ContentEncoding, +template <typename Incoming, typename Outgoing, typename TransportEncoding, typename ContentEncoding, typename Net, typename BufferT> -streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding, +streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding, Net, BufferT>:: streaming_io_peer( own<conveyor_feeder<data<Incoming, ContentEncoding>>> feed, - own<async_io_stream> str) + own<async_io_stream<Net>> str) : streaming_io_peer{std::move(feed), std::move(str), {}} {} -template <typename Incoming, typename Outgoing, typename TransportEncoding, typename ContentEncoding, typename BufferT> -streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding, +template <typename Incoming, typename Outgoing, typename TransportEncoding, typename ContentEncoding, typename Net, typename BufferT> +streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding, Net, BufferT>:: streaming_io_peer( own<conveyor_feeder<data<Incoming, ContentEncoding>>> feed, - own<async_io_stream> stream, transport<TransportEncoding> in_codec) + own<async_io_stream<Net>> stream, transport<TransportEncoding> in_codec) : incoming_feeder_{std::move(feed)}, io_stream_{std::move(stream)}, in_codec_{std::move(in_codec)}, @@ -99,8 +98,8 @@ streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding, template <typename Incoming, typename Outgoing, typename TransportEncoding, - typename ContentEncoding, typename BufferT> -error_or<void> streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding, + typename ContentEncoding, typename Net, typename BufferT> +error_or<void> streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding, Net, BufferT>::send(data<Outgoing, ContentEncoding> msg) { bool restart_write = (out_buffer_.read_segment_length() == 0u); @@ -132,10 +131,10 @@ error_or<void> streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentE return void_t{}; } -template <typename Incoming, typename Outgoing, typename TransportEncoding, typename ContentEncoding, +template <typename Incoming, typename Outgoing, typename TransportEncoding, typename ContentEncoding, typename Net, typename BufferT> conveyor<void> -streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding, +streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding, Net, BufferT>::on_disconnected() { io_read_disconnected_ = io_stream_->on_read_disconnected().then([this](){ if(disconnect_feeder_){ @@ -148,14 +147,14 @@ streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding, return std::move(caf.conveyor); } -template <typename Incoming, typename Outgoing, typename TransportEncoding, typename ContentEncoding, typename BufferT> -std::pair<own<streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding, BufferT>>, +template <typename Incoming, typename Outgoing, typename TransportEncoding, typename ContentEncoding, typename Net, typename BufferT> +std::pair<own<streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding, Net, BufferT>>, conveyor<data<Incoming,ContentEncoding>>> -new_streaming_io_peer(own<async_io_stream> stream) { +new_streaming_io_peer(own<async_io_stream<Net>> stream) { auto caf = new_conveyor_and_feeder<data<Incoming, ContentEncoding>>(); - return {heap<streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding,BufferT>>( + return {heap<streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding,Net,BufferT>>( std::move(caf.feeder), std::move(stream)), std::move(caf.conveyor)}; } diff --git a/modules/io_codec/examples/peer_echo_client.cpp b/modules/io_codec/examples/peer_echo_client.cpp index 942deed..b610372 100644 --- a/modules/io_codec/examples/peer_echo_client.cpp +++ b/modules/io_codec/examples/peer_echo_client.cpp @@ -69,13 +69,13 @@ int main(int argc, char** argv){ codec<sch::Echo, encode::KelSimple> simple_codec; - network.connect(*addr).then([&](saw::own<saw::io_stream> client){ + network.connect(*addr).then([&](saw::own<saw::io_stream<saw::net::Os>> client){ if(!client){ return; } - auto echo_stream = saw::heap<saw::async_io_stream>(std::move(client)); - auto echo_peer_stream_p = saw::new_streaming_io_peer<sch::Echo, sch::Echo, trans::FixedLength<8u>, encode::KelSimple, ring_buffer>(std::move(echo_stream)); + auto echo_stream = saw::heap<saw::async_io_stream<saw::net::Os>>(std::move(client)); + auto echo_peer_stream_p = saw::new_streaming_io_peer<sch::Echo, sch::Echo, trans::FixedLength<8u>, encode::KelSimple>(std::move(echo_stream)); std::cout<<"Connected"<<std::endl; diff --git a/modules/io_codec/examples/peer_echo_server.cpp b/modules/io_codec/examples/peer_echo_server.cpp index 4d201c0..804d9ad 100644 --- a/modules/io_codec/examples/peer_echo_server.cpp +++ b/modules/io_codec/examples/peer_echo_server.cpp @@ -50,13 +50,13 @@ int main(){ return -2; } - echo_srv->accept().then([&](saw::own<saw::io_stream> client){ + echo_srv->accept().then([&](saw::own<saw::io_stream<saw::net::Os>> client){ if(!client){ keep_running = false; return; } - auto echo_stream = saw::heap<saw::async_io_stream>(std::move(client)); - auto echo_peer_stream_p = saw::new_streaming_io_peer<sch::Echo, sch::Echo, trans::FixedLength<8u>, encode::KelSimple, ring_buffer>(std::move(echo_stream)); + auto echo_stream = saw::heap<saw::async_io_stream<saw::net::Os>>(std::move(client)); + auto echo_peer_stream_p = saw::new_streaming_io_peer<sch::Echo, sch::Echo, trans::FixedLength<8u>, encode::KelSimple>(std::move(echo_stream)); std::cout<<"Connected client"<<std::endl; auto peer_str = echo_peer_stream_p.first.get(); |