diff options
Diffstat (limited to 'modules/io/c++')
-rw-r--r-- | modules/io/c++/io.cpp | 53 | ||||
-rw-r--r-- | modules/io/c++/io.hpp | 90 | ||||
-rw-r--r-- | modules/io/c++/io_unix.cpp | 58 |
3 files changed, 108 insertions, 93 deletions
diff --git a/modules/io/c++/io.cpp b/modules/io/c++/io.cpp index 06c9cbb..50423e1 100644 --- a/modules/io/c++/io.cpp +++ b/modules/io/c++/io.cpp @@ -4,59 +4,6 @@ namespace saw { -async_io_stream::async_io_stream(own<io_stream> str) - : stream_{std::move(str)}, - read_ready_{stream_->read_ready() - .then([this]() { read_stepper_.read_step(*stream_); }) - .sink()}, - write_ready_{stream_->write_ready() - .then([this]() { write_stepper_.write_step(*stream_); }) - .sink()}, - read_disconnected_{stream_->on_read_disconnected() - .then([this]() { - if (read_stepper_.on_read_disconnect) { - read_stepper_.on_read_disconnect->feed(); - } - }) - .sink()} {} - -void async_io_stream::read(void *buffer, size_t min_length, size_t max_length) { - SAW_ASSERT(buffer && max_length >= min_length && min_length > 0) { return; } - - SAW_ASSERT(!read_stepper_.read_task.has_value()) { return; } - - read_stepper_.read_task = read_task_and_step_helper::read_io_task{ - buffer, min_length, max_length, 0}; - read_stepper_.read_step(*stream_); -} - -conveyor<size_t> async_io_stream::read_done() { - auto caf = new_conveyor_and_feeder<size_t>(); - read_stepper_.read_done = std::move(caf.feeder); - return std::move(caf.conveyor); -} - -conveyor<void> async_io_stream::on_read_disconnected() { - auto caf = new_conveyor_and_feeder<void>(); - read_stepper_.on_read_disconnect = std::move(caf.feeder); - return std::move(caf.conveyor); -} - -void async_io_stream::write(const void *buffer, size_t length) { - SAW_ASSERT(buffer && length > 0) { return; } - - SAW_ASSERT(!write_stepper_.write_task.has_value()) { return; } - - write_stepper_.write_task = - write_task_and_step_helper::write_io_task{buffer, length, 0}; - write_stepper_.write_step(*stream_); -} - -conveyor<size_t> async_io_stream::write_done() { - auto caf = new_conveyor_and_feeder<size_t>(); - write_stepper_.write_done = std::move(caf.feeder); - return std::move(caf.conveyor); -} string_network_address::string_network_address(const std::string &address, uint16_t port) diff --git a/modules/io/c++/io.hpp b/modules/io/c++/io.hpp index 390ddcc..fccba7f 100644 --- a/modules/io/c++/io.hpp +++ b/modules/io/c++/io.hpp @@ -55,6 +55,7 @@ public: /* * Io stream */ +template<typename T = net::Os> class io_stream : public input_stream, public output_stream { public: virtual ~io_stream() = default; @@ -79,10 +80,11 @@ public: virtual conveyor<size_t> write_done() = 0; }; +template<typename T = net::Os> class async_io_stream final : public async_input_stream, public async_output_stream { private: - own<io_stream> stream_; + own<io_stream<T>> stream_; conveyor_sink read_ready_; conveyor_sink write_ready_; @@ -92,7 +94,7 @@ private: write_task_and_step_helper write_stepper_; public: - async_io_stream(own<io_stream> str); + async_io_stream(own<io_stream<T>> str); SAW_FORBID_COPY(async_io_stream); SAW_FORBID_MOVE(async_io_stream); @@ -108,19 +110,22 @@ public: conveyor<size_t> write_done() override; }; +template<typename T = net::Os> class server { public: virtual ~server() = default; - virtual conveyor<own<io_stream>> accept() = 0; + virtual conveyor<own<io_stream<T>>> accept() = 0; }; +template<typename T = net::Os> class network_address; /** * Datagram class. Bound to a local address it is able to receive inbound * datagram messages and send them as well as long as an address is provided as * well */ +template<typename T = net::Os> class datagram { public: virtual ~datagram() = default; @@ -129,13 +134,14 @@ public: virtual conveyor<void> read_ready() = 0; virtual error_or<size_t> write(const void *buffer, size_t length, - network_address &dest) = 0; + network_address<T> &dest) = 0; virtual conveyor<void> write_ready() = 0; }; class os_network_address; class string_network_address; +template<typename T> class network_address { public: using child_variant = @@ -149,14 +155,14 @@ public: virtual uint16_t port() const = 0; }; -class os_network_address : public network_address { +class os_network_address : public network_address<net::Os> { public: virtual ~os_network_address() = default; network_address::child_variant representation() override { return this; } }; -class string_network_address final : public network_address { +class string_network_address final : public network_address<net::Os> { private: std::string address_value_; uint16_t port_value_; @@ -178,10 +184,10 @@ public: /** * Resolve the provided string and uint16 to the preferred storage method */ - virtual conveyor<own<network_address>> + virtual conveyor<own<network_address<T>>> resolve_address(const std::string &addr, uint16_t port_hint = 0) = 0; - virtual error_or<own<network_address>> + virtual error_or<own<network_address<T>>> parse_address(const std::string &addr, uint16_t port_hint = 0) = 0; /** @@ -195,17 +201,17 @@ public: /** * Set up a listener on this address */ - virtual own<server> listen(network_address &bind_addr) = 0; + virtual own<server<T>> listen(network_address<T> &bind_addr) = 0; /** * Connect to a remote address */ - virtual conveyor<own<io_stream>> connect(network_address &address) = 0; + virtual conveyor<own<io_stream<T>>> connect(network_address<T> &address) = 0; /** * Bind a datagram socket at this address. */ - virtual own<datagram> datagram(network_address &address) = 0; + virtual own<datagram<T>> bind_datagram(network_address<T> &address) = 0; }; class io_provider { @@ -222,6 +228,68 @@ struct async_io_context { event_loop &event_loop; event_port &event_port; }; +} + +namespace saw { +template<typename T> +async_io_stream<T>::async_io_stream(own<io_stream<T>> str) + : stream_{std::move(str)}, + read_ready_{stream_->read_ready() + .then([this]() { read_stepper_.read_step(*stream_); }) + .sink()}, + write_ready_{stream_->write_ready() + .then([this]() { write_stepper_.write_step(*stream_); }) + .sink()}, + read_disconnected_{stream_->on_read_disconnected() + .then([this]() { + if (read_stepper_.on_read_disconnect) { + read_stepper_.on_read_disconnect->feed(); + } + }) + .sink()} {} + +template<typename T> +void async_io_stream<T>::read(void *buffer, size_t min_length, size_t max_length) { + SAW_ASSERT(buffer && max_length >= min_length && min_length > 0) { return; } + + SAW_ASSERT(!read_stepper_.read_task.has_value()) { return; } + + read_stepper_.read_task = read_task_and_step_helper::read_io_task{ + buffer, min_length, max_length, 0}; + read_stepper_.read_step(*stream_); +} + +template<typename T> +conveyor<size_t> async_io_stream<T>::read_done() { + auto caf = new_conveyor_and_feeder<size_t>(); + read_stepper_.read_done = std::move(caf.feeder); + return std::move(caf.conveyor); +} + +template<typename T> +conveyor<void> async_io_stream<T>::on_read_disconnected() { + auto caf = new_conveyor_and_feeder<void>(); + read_stepper_.on_read_disconnect = std::move(caf.feeder); + return std::move(caf.conveyor); +} + +template<typename T> +void async_io_stream<T>::write(const void *buffer, size_t length) { + SAW_ASSERT(buffer && length > 0) { return; } + + SAW_ASSERT(!write_stepper_.write_task.has_value()) { return; } + + write_stepper_.write_task = + write_task_and_step_helper::write_io_task{buffer, length, 0}; + write_stepper_.write_step(*stream_); +} + +template<typename T> +conveyor<size_t> async_io_stream<T>::write_done() { + auto caf = new_conveyor_and_feeder<size_t>(); + write_stepper_.write_done = std::move(caf.feeder); + return std::move(caf.conveyor); +} error_or<async_io_context> setup_async_io(); } // namespace saw diff --git a/modules/io/c++/io_unix.cpp b/modules/io/c++/io_unix.cpp index 73e8e3b..1d89e7d 100644 --- a/modules/io/c++/io_unix.cpp +++ b/modules/io/c++/io_unix.cpp @@ -264,7 +264,7 @@ public: ssize_t unix_read(int fd, void *buffer, size_t length); ssize_t unix_write(int fd, const void *buffer, size_t length); -class unix_io_stream final : public io_stream, public i_fd_owner { +class unix_io_stream final : public io_stream<net::Os>, public i_fd_owner { private: own<conveyor_feeder<void>> read_ready_ = nullptr; own<conveyor_feeder<void>> on_read_disconnect_ = nullptr; @@ -299,19 +299,19 @@ public: void notify(uint32_t mask) override; }; -class unix_server final : public server, public i_fd_owner { +class unix_server final : public server<net::Os>, public i_fd_owner { private: - own<conveyor_feeder<own<io_stream>>> accept_feeder_ = nullptr; + own<conveyor_feeder<own<io_stream<net::Os>>>> accept_feeder_ = nullptr; public: unix_server(unix_event_port &event_port, int file_descriptor, int fd_flags); - conveyor<own<io_stream>> accept() override; + conveyor<own<io_stream<net::Os>>> accept() override; void notify(uint32_t mask) override; }; -class unix_datagram final : public datagram, public i_fd_owner { +class unix_datagram final : public datagram<net::Os>, public i_fd_owner { private: own<conveyor_feeder<void>> read_ready_ = nullptr; own<conveyor_feeder<void>> write_ready_ = nullptr; @@ -324,7 +324,7 @@ public: conveyor<void> read_ready() override; error_or<size_t> write(const void *buffer, size_t length, - network_address &dest) override; + network_address<net::Os> &dest) override; conveyor<void> write_ready() override; void notify(uint32_t mask) override; @@ -437,19 +437,19 @@ private: public: unix_network(unix_event_port &event_port); - conveyor<own<network_address>> + conveyor<own<network_address<net::Os>>> resolve_address(const std::string &address, uint16_t port_hint = 0) override; - error_or<own<network_address>> + error_or<own<network_address<net::Os>>> parse_address(const std::string& address, uint16_t port_hint = 0) override; - own<server> listen(network_address &addr) override; + own<server<net::Os>> listen(network_address<net::Os> &addr) override; - conveyor<own<io_stream>> connect(network_address &addr) override; + conveyor<own<io_stream<net::Os>>> connect(network_address<net::Os> &addr) override; - own<class datagram> datagram(network_address &addr) override; + own<datagram<net::Os>> bind_datagram(network_address<net::Os> &addr) override; }; class unix_io_provider final : public io_provider { @@ -462,7 +462,7 @@ private: public: unix_io_provider(unix_event_port &port_ref, own<event_port> port); - class network<net::Os> &get_network() override; + network<net::Os> &get_network() override; own<input_stream> wrap_input_fd(int fd) override; @@ -564,8 +564,8 @@ unix_server::unix_server(unix_event_port &event_port, int file_descriptor, int fd_flags) : i_fd_owner{event_port, file_descriptor, fd_flags, EPOLLIN} {} -conveyor<own<io_stream>> unix_server::accept() { - auto caf = new_conveyor_and_feeder<own<io_stream>>(); +conveyor<own<io_stream<net::Os>>> unix_server::accept() { + auto caf = new_conveyor_and_feeder<own<io_stream<net::Os>>>(); accept_feeder_ = std::move(caf.feeder); return std::move(caf.conveyor); } @@ -624,7 +624,7 @@ conveyor<void> unix_datagram::read_ready() { } error_or<size_t> unix_datagram::write(const void *buffer, size_t length, - network_address &dest) { + network_address<net::Os> &dest) { unix_network_address &unix_dest = static_cast<unix_network_address &>(dest); socket_address &sock_addr = unix_dest.unix_address(); socklen_t sock_addr_length = sock_addr.get_raw_length(); @@ -664,7 +664,7 @@ bool begins_with(const std::string_view &viewed, } std::variant<unix_network_address, unix_network_address *> -translate_network_address_to_unix_network_address(network_address &addr) { +translate_network_address_to_unix_network_address(network_address<net::Os> &addr) { auto addr_variant = addr.representation(); std::variant<unix_network_address, unix_network_address *> os_addr = std::visit( @@ -705,7 +705,7 @@ unix_network_address &translate_to_unix_address_ref( } // namespace -own<server> unix_network::listen(network_address &addr) { +own<server<net::Os>> unix_network::listen(network_address<net::Os> &addr) { auto unix_addr_storage = translate_network_address_to_unix_network_address(addr); unix_network_address &address = @@ -743,7 +743,7 @@ own<server> unix_network::listen(network_address &addr) { #include <iostream> namespace saw { namespace unix { -conveyor<own<io_stream>> unix_network::connect(network_address &addr) { +conveyor<own<io_stream<net::Os>>> unix_network::connect(network_address<net::Os> &addr) { auto unix_addr_storage = translate_network_address_to_unix_network_address(addr); unix_network_address &address = @@ -751,12 +751,12 @@ conveyor<own<io_stream>> unix_network::connect(network_address &addr) { assert(address.unix_address_size() > 0); if (address.unix_address_size() == 0) { - return conveyor<own<io_stream>>{make_error<err::critical>()}; + return conveyor<own<io_stream<net::Os>>>{make_error<err::critical>()}; } int fd = address.unix_address(0).socket(SOCK_STREAM); if (fd < 0) { - return conveyor<own<io_stream>>{make_error<err::disconnected>()}; + return conveyor<own<io_stream<net::Os>>>{make_error<err::disconnected>()}; } own<unix_io_stream> io_str = @@ -780,12 +780,12 @@ conveyor<own<io_stream>> unix_network::connect(network_address &addr) { if (error == EINPROGRESS) { conveyor<void> write_rdy = io_str->write_ready(); - return write_rdy.then([ios = std::move(io_str)] () mutable -> error_or<own<io_stream>> { + return write_rdy.then([ios = std::move(io_str)] () mutable -> error_or<own<io_stream<net::Os>>> { if(!ios){ return make_error<err::invalid_state>("Limit node invalidated"); } - own<io_stream> mov_ios = std::move(ios); + own<io_stream<net::Os>> mov_ios = std::move(ios); /** * This guarantees the old async pipe to not be used anymore */ @@ -795,7 +795,7 @@ conveyor<own<io_stream>> unix_network::connect(network_address &addr) { }); } else if (error != EINTR) { /// @todo Push error message from - return conveyor<own<io_stream>>{make_error<err::disconnected>()}; + return conveyor<own<io_stream<net::Os>>>{make_error<err::disconnected>()}; } } else { success = true; @@ -804,13 +804,13 @@ conveyor<own<io_stream>> unix_network::connect(network_address &addr) { } if (!success) { - return conveyor<own<io_stream>>{make_error<err::disconnected>()}; + return conveyor<own<io_stream<net::Os>>>{make_error<err::disconnected>()}; } - return conveyor<own<io_stream>>{std::move(io_str)}; + return conveyor<own<io_stream<net::Os>>>{std::move(io_str)}; } -own<datagram> unix_network::datagram(network_address &addr) { +own<datagram<net::Os>> unix_network::bind_datagram(network_address<net::Os> &addr) { auto unix_addr_storage = translate_network_address_to_unix_network_address(addr); unix_network_address &address = @@ -853,7 +853,7 @@ size_t unix_network_address::unix_address_size() const { unix_network::unix_network(unix_event_port &event) : event_port_{event} {} -conveyor<own<network_address>> +conveyor<own<network_address<net::Os>>> unix_network::resolve_address(const std::string &path, uint16_t port_hint) { std::string_view addr_view{path}; { @@ -866,11 +866,11 @@ unix_network::resolve_address(const std::string &path, uint16_t port_hint) { std::vector<socket_address> addresses = socket_address::resolve(addr_view, port_hint); - return conveyor<own<network_address>>{ + return conveyor<own<network_address<net::Os>>>{ heap<unix_network_address>(path, port_hint, std::move(addresses))}; } -error_or<own<network_address>> +error_or<own<network_address<net::Os>>> unix_network::parse_address(const std::string& path, uint16_t port_hint){ std::string_view addr_view{path}; { |