From b048b02732cbfcfbb95bb8e16dec71aca0e977f4 Mon Sep 17 00:00:00 2001 From: Claudius 'keldu' Holeksa Date: Wed, 16 Oct 2024 18:51:50 +0200 Subject: Reworked abstraction for network --- modules/io/c++/io.hpp | 90 ++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 79 insertions(+), 11 deletions(-) (limited to 'modules/io/c++/io.hpp') diff --git a/modules/io/c++/io.hpp b/modules/io/c++/io.hpp index 390ddcc..fccba7f 100644 --- a/modules/io/c++/io.hpp +++ b/modules/io/c++/io.hpp @@ -55,6 +55,7 @@ public: /* * Io stream */ +template class io_stream : public input_stream, public output_stream { public: virtual ~io_stream() = default; @@ -79,10 +80,11 @@ public: virtual conveyor write_done() = 0; }; +template class async_io_stream final : public async_input_stream, public async_output_stream { private: - own stream_; + own> stream_; conveyor_sink read_ready_; conveyor_sink write_ready_; @@ -92,7 +94,7 @@ private: write_task_and_step_helper write_stepper_; public: - async_io_stream(own str); + async_io_stream(own> str); SAW_FORBID_COPY(async_io_stream); SAW_FORBID_MOVE(async_io_stream); @@ -108,19 +110,22 @@ public: conveyor write_done() override; }; +template class server { public: virtual ~server() = default; - virtual conveyor> accept() = 0; + virtual conveyor>> accept() = 0; }; +template class network_address; /** * Datagram class. Bound to a local address it is able to receive inbound * datagram messages and send them as well as long as an address is provided as * well */ +template class datagram { public: virtual ~datagram() = default; @@ -129,13 +134,14 @@ public: virtual conveyor read_ready() = 0; virtual error_or write(const void *buffer, size_t length, - network_address &dest) = 0; + network_address &dest) = 0; virtual conveyor write_ready() = 0; }; class os_network_address; class string_network_address; +template class network_address { public: using child_variant = @@ -149,14 +155,14 @@ public: virtual uint16_t port() const = 0; }; -class os_network_address : public network_address { +class os_network_address : public network_address { public: virtual ~os_network_address() = default; network_address::child_variant representation() override { return this; } }; -class string_network_address final : public network_address { +class string_network_address final : public network_address { private: std::string address_value_; uint16_t port_value_; @@ -178,10 +184,10 @@ public: /** * Resolve the provided string and uint16 to the preferred storage method */ - virtual conveyor> + virtual conveyor>> resolve_address(const std::string &addr, uint16_t port_hint = 0) = 0; - virtual error_or> + virtual error_or>> parse_address(const std::string &addr, uint16_t port_hint = 0) = 0; /** @@ -195,17 +201,17 @@ public: /** * Set up a listener on this address */ - virtual own listen(network_address &bind_addr) = 0; + virtual own> listen(network_address &bind_addr) = 0; /** * Connect to a remote address */ - virtual conveyor> connect(network_address &address) = 0; + virtual conveyor>> connect(network_address &address) = 0; /** * Bind a datagram socket at this address. */ - virtual own datagram(network_address &address) = 0; + virtual own> bind_datagram(network_address &address) = 0; }; class io_provider { @@ -222,6 +228,68 @@ struct async_io_context { event_loop &event_loop; event_port &event_port; }; +} + +namespace saw { +template +async_io_stream::async_io_stream(own> str) + : stream_{std::move(str)}, + read_ready_{stream_->read_ready() + .then([this]() { read_stepper_.read_step(*stream_); }) + .sink()}, + write_ready_{stream_->write_ready() + .then([this]() { write_stepper_.write_step(*stream_); }) + .sink()}, + read_disconnected_{stream_->on_read_disconnected() + .then([this]() { + if (read_stepper_.on_read_disconnect) { + read_stepper_.on_read_disconnect->feed(); + } + }) + .sink()} {} + +template +void async_io_stream::read(void *buffer, size_t min_length, size_t max_length) { + SAW_ASSERT(buffer && max_length >= min_length && min_length > 0) { return; } + + SAW_ASSERT(!read_stepper_.read_task.has_value()) { return; } + + read_stepper_.read_task = read_task_and_step_helper::read_io_task{ + buffer, min_length, max_length, 0}; + read_stepper_.read_step(*stream_); +} + +template +conveyor async_io_stream::read_done() { + auto caf = new_conveyor_and_feeder(); + read_stepper_.read_done = std::move(caf.feeder); + return std::move(caf.conveyor); +} + +template +conveyor async_io_stream::on_read_disconnected() { + auto caf = new_conveyor_and_feeder(); + read_stepper_.on_read_disconnect = std::move(caf.feeder); + return std::move(caf.conveyor); +} + +template +void async_io_stream::write(const void *buffer, size_t length) { + SAW_ASSERT(buffer && length > 0) { return; } + + SAW_ASSERT(!write_stepper_.write_task.has_value()) { return; } + + write_stepper_.write_task = + write_task_and_step_helper::write_io_task{buffer, length, 0}; + write_stepper_.write_step(*stream_); +} + +template +conveyor async_io_stream::write_done() { + auto caf = new_conveyor_and_feeder(); + write_stepper_.write_done = std::move(caf.feeder); + return std::move(caf.conveyor); +} error_or setup_async_io(); } // namespace saw -- cgit v1.2.3