#pragma once #include #include #include "io_helpers.hpp" #include #include namespace saw { namespace net { struct Os {}; } /** * Set of error common in io */ namespace err { struct disconnected { static constexpr std::string_view description = "Disconnected"; static constexpr bool is_critical = true; }; struct resource_busy { static constexpr std::string_view description = "Resource busy"; static constexpr bool is_critical = false; }; } /* * Input stream */ class input_stream { public: virtual ~input_stream() = default; virtual error_or read(void *buffer, size_t length) = 0; virtual conveyor read_ready() = 0; virtual conveyor on_read_disconnected() = 0; }; /* * Output stream */ class output_stream { public: virtual ~output_stream() = default; virtual error_or write(const void *buffer, size_t length) = 0; virtual conveyor write_ready() = 0; }; /* * Io stream */ template class io_stream : public input_stream, public output_stream { public: virtual ~io_stream() = default; }; class async_input_stream { public: virtual ~async_input_stream() = default; virtual void read(void *buffer, size_t min_length, size_t max_length) = 0; virtual conveyor read_done() = 0; virtual conveyor on_read_disconnected() = 0; }; class async_output_stream { public: virtual ~async_output_stream() = default; virtual void write(const void *buffer, size_t length) = 0; virtual conveyor write_done() = 0; }; template class async_io_stream final : public async_input_stream, public async_output_stream { private: own> stream_; conveyor_sink read_ready_; conveyor_sink write_ready_; conveyor_sink read_disconnected_; read_task_and_step_helper read_stepper_; write_task_and_step_helper write_stepper_; public: async_io_stream(own> str); SAW_FORBID_COPY(async_io_stream); SAW_FORBID_MOVE(async_io_stream); void read(void *buffer, size_t length, size_t max_length) override; conveyor read_done() override; conveyor on_read_disconnected() override; void write(const void *buffer, size_t length) override; conveyor write_done() override; }; template class server { public: virtual ~server() = default; virtual conveyor>> accept() = 0; }; template class network_address; /** * Datagram class. Bound to a local address it is able to receive inbound * datagram messages and send them as well as long as an address is provided as * well */ template class datagram { public: virtual ~datagram() = default; virtual error_or read(void *buffer, size_t length) = 0; virtual conveyor read_ready() = 0; virtual error_or write(const void *buffer, size_t length, network_address &dest) = 0; virtual conveyor write_ready() = 0; }; class os_network_address; class string_network_address; template class network_address { public: using child_variant = std::variant; virtual ~network_address() = default; virtual network_address::child_variant representation() = 0; virtual const std::string &address() const = 0; virtual uint16_t port() const = 0; }; class os_network_address : public network_address { public: virtual ~os_network_address() = default; network_address::child_variant representation() override { return this; } }; class string_network_address final : public network_address { private: std::string address_value_; uint16_t port_value_; public: string_network_address(const std::string &address, uint16_t port); const std::string &address() const override; uint16_t port() const override; network_address::child_variant representation() override { return this; } }; template class network { public: virtual ~network() = default; /** * Resolve the provided string and uint16 to the preferred storage method */ virtual conveyor>> resolve_address(const std::string &addr, uint16_t port_hint = 0) = 0; virtual error_or>> parse_address(const std::string &addr, uint16_t port_hint = 0) = 0; /** * Parse the provided string and uint16 to the preferred storage method * Since no dns request is made here, no async conveyors have to be used. */ /// @todo implement // virtual Own parseAddress(const std::string& addr, // uint16_t port_hint = 0) = 0; /** * Set up a listener on this address */ virtual own> listen(network_address &bind_addr) = 0; /** * Connect to a remote address */ virtual conveyor>> connect(network_address &address) = 0; /** * Bind a datagram socket at this address. */ virtual own> bind_datagram(network_address &address) = 0; }; class io_provider { public: virtual ~io_provider() = default; virtual own wrap_input_fd(int fd) = 0; virtual network &get_network() = 0; }; struct async_io_context { own io; event_loop &event_loop; event_port &event_port; }; } namespace saw { template async_io_stream::async_io_stream(own> str) : stream_{std::move(str)}, read_ready_{stream_->read_ready() .then([this]() { read_stepper_.read_step(*stream_); }) .sink()}, write_ready_{stream_->write_ready() .then([this]() { write_stepper_.write_step(*stream_); }) .sink()}, read_disconnected_{stream_->on_read_disconnected() .then([this]() { if (read_stepper_.on_read_disconnect) { read_stepper_.on_read_disconnect->feed(); } }) .sink()} {} template void async_io_stream::read(void *buffer, size_t min_length, size_t max_length) { SAW_ASSERT(buffer && max_length >= min_length && min_length > 0) { return; } SAW_ASSERT(!read_stepper_.read_task.has_value()) { return; } read_stepper_.read_task = read_task_and_step_helper::read_io_task{ buffer, min_length, max_length, 0}; read_stepper_.read_step(*stream_); } template conveyor async_io_stream::read_done() { auto caf = new_conveyor_and_feeder(); read_stepper_.read_done = std::move(caf.feeder); return std::move(caf.conveyor); } template conveyor async_io_stream::on_read_disconnected() { auto caf = new_conveyor_and_feeder(); read_stepper_.on_read_disconnect = std::move(caf.feeder); return std::move(caf.conveyor); } template void async_io_stream::write(const void *buffer, size_t length) { SAW_ASSERT(buffer && length > 0) { return; } SAW_ASSERT(!write_stepper_.write_task.has_value()) { return; } write_stepper_.write_task = write_task_and_step_helper::write_io_task{buffer, length, 0}; write_stepper_.write_step(*stream_); } template conveyor async_io_stream::write_done() { auto caf = new_conveyor_and_feeder(); write_stepper_.write_done = std::move(caf.feeder); return std::move(caf.conveyor); } error_or setup_async_io(); } // namespace saw