diff options
Diffstat (limited to 'modules/io/c++/io.hpp')
-rw-r--r-- | modules/io/c++/io.hpp | 90 |
1 files changed, 79 insertions, 11 deletions
diff --git a/modules/io/c++/io.hpp b/modules/io/c++/io.hpp index 390ddcc..fccba7f 100644 --- a/modules/io/c++/io.hpp +++ b/modules/io/c++/io.hpp @@ -55,6 +55,7 @@ public: /* * Io stream */ +template<typename T = net::Os> class io_stream : public input_stream, public output_stream { public: virtual ~io_stream() = default; @@ -79,10 +80,11 @@ public: virtual conveyor<size_t> write_done() = 0; }; +template<typename T = net::Os> class async_io_stream final : public async_input_stream, public async_output_stream { private: - own<io_stream> stream_; + own<io_stream<T>> stream_; conveyor_sink read_ready_; conveyor_sink write_ready_; @@ -92,7 +94,7 @@ private: write_task_and_step_helper write_stepper_; public: - async_io_stream(own<io_stream> str); + async_io_stream(own<io_stream<T>> str); SAW_FORBID_COPY(async_io_stream); SAW_FORBID_MOVE(async_io_stream); @@ -108,19 +110,22 @@ public: conveyor<size_t> write_done() override; }; +template<typename T = net::Os> class server { public: virtual ~server() = default; - virtual conveyor<own<io_stream>> accept() = 0; + virtual conveyor<own<io_stream<T>>> accept() = 0; }; +template<typename T = net::Os> class network_address; /** * Datagram class. Bound to a local address it is able to receive inbound * datagram messages and send them as well as long as an address is provided as * well */ +template<typename T = net::Os> class datagram { public: virtual ~datagram() = default; @@ -129,13 +134,14 @@ public: virtual conveyor<void> read_ready() = 0; virtual error_or<size_t> write(const void *buffer, size_t length, - network_address &dest) = 0; + network_address<T> &dest) = 0; virtual conveyor<void> write_ready() = 0; }; class os_network_address; class string_network_address; +template<typename T> class network_address { public: using child_variant = @@ -149,14 +155,14 @@ public: virtual uint16_t port() const = 0; }; -class os_network_address : public network_address { +class os_network_address : public network_address<net::Os> { public: virtual ~os_network_address() = default; network_address::child_variant representation() override { return this; } }; -class string_network_address final : public network_address { +class string_network_address final : public network_address<net::Os> { private: std::string address_value_; uint16_t port_value_; @@ -178,10 +184,10 @@ public: /** * Resolve the provided string and uint16 to the preferred storage method */ - virtual conveyor<own<network_address>> + virtual conveyor<own<network_address<T>>> resolve_address(const std::string &addr, uint16_t port_hint = 0) = 0; - virtual error_or<own<network_address>> + virtual error_or<own<network_address<T>>> parse_address(const std::string &addr, uint16_t port_hint = 0) = 0; /** @@ -195,17 +201,17 @@ public: /** * Set up a listener on this address */ - virtual own<server> listen(network_address &bind_addr) = 0; + virtual own<server<T>> listen(network_address<T> &bind_addr) = 0; /** * Connect to a remote address */ - virtual conveyor<own<io_stream>> connect(network_address &address) = 0; + virtual conveyor<own<io_stream<T>>> connect(network_address<T> &address) = 0; /** * Bind a datagram socket at this address. */ - virtual own<datagram> datagram(network_address &address) = 0; + virtual own<datagram<T>> bind_datagram(network_address<T> &address) = 0; }; class io_provider { @@ -222,6 +228,68 @@ struct async_io_context { event_loop &event_loop; event_port &event_port; }; +} + +namespace saw { +template<typename T> +async_io_stream<T>::async_io_stream(own<io_stream<T>> str) + : stream_{std::move(str)}, + read_ready_{stream_->read_ready() + .then([this]() { read_stepper_.read_step(*stream_); }) + .sink()}, + write_ready_{stream_->write_ready() + .then([this]() { write_stepper_.write_step(*stream_); }) + .sink()}, + read_disconnected_{stream_->on_read_disconnected() + .then([this]() { + if (read_stepper_.on_read_disconnect) { + read_stepper_.on_read_disconnect->feed(); + } + }) + .sink()} {} + +template<typename T> +void async_io_stream<T>::read(void *buffer, size_t min_length, size_t max_length) { + SAW_ASSERT(buffer && max_length >= min_length && min_length > 0) { return; } + + SAW_ASSERT(!read_stepper_.read_task.has_value()) { return; } + + read_stepper_.read_task = read_task_and_step_helper::read_io_task{ + buffer, min_length, max_length, 0}; + read_stepper_.read_step(*stream_); +} + +template<typename T> +conveyor<size_t> async_io_stream<T>::read_done() { + auto caf = new_conveyor_and_feeder<size_t>(); + read_stepper_.read_done = std::move(caf.feeder); + return std::move(caf.conveyor); +} + +template<typename T> +conveyor<void> async_io_stream<T>::on_read_disconnected() { + auto caf = new_conveyor_and_feeder<void>(); + read_stepper_.on_read_disconnect = std::move(caf.feeder); + return std::move(caf.conveyor); +} + +template<typename T> +void async_io_stream<T>::write(const void *buffer, size_t length) { + SAW_ASSERT(buffer && length > 0) { return; } + + SAW_ASSERT(!write_stepper_.write_task.has_value()) { return; } + + write_stepper_.write_task = + write_task_and_step_helper::write_io_task{buffer, length, 0}; + write_stepper_.write_step(*stream_); +} + +template<typename T> +conveyor<size_t> async_io_stream<T>::write_done() { + auto caf = new_conveyor_and_feeder<size_t>(); + write_stepper_.write_done = std::move(caf.feeder); + return std::move(caf.conveyor); +} error_or<async_io_context> setup_async_io(); } // namespace saw |