#include "io.h" #include namespace saw { async_io_stream::async_io_stream(own str) : stream_{std::move(str)}, read_ready_{stream_->read_ready() .then([this]() { read_stepper_.read_step(*stream_); }) .sink()}, write_ready_{stream_->write_ready() .then([this]() { write_stepper_.write_step(*stream_); }) .sink()}, read_disconnected_{stream_->on_read_disconnected() .then([this]() { if (read_stepper_.on_read_disconnect) { read_stepper_.on_read_disconnect->feed(); } }) .sink()} {} void async_io_stream::read(void *buffer, size_t min_length, size_t max_length) { SAW_ASSERT(buffer && max_length >= min_length && min_length > 0) { return; } SAW_ASSERT(!read_stepper_.read_task.has_value()) { return; } read_stepper_.read_task = read_task_and_step_helper::read_io_task{ buffer, min_length, max_length, 0}; read_stepper_.read_step(*stream_); } conveyor async_io_stream::read_done() { auto caf = new_conveyor_and_feeder(); read_stepper_.read_done = std::move(caf.feeder); return std::move(caf.conveyor); } conveyor async_io_stream::on_read_disconnected() { auto caf = new_conveyor_and_feeder(); read_stepper_.on_read_disconnect = std::move(caf.feeder); return std::move(caf.conveyor); } void async_io_stream::write(const void *buffer, size_t length) { SAW_ASSERT(buffer && length > 0) { return; } SAW_ASSERT(!write_stepper_.write_task.has_value()) { return; } write_stepper_.write_task = write_task_and_step_helper::write_io_task{buffer, length, 0}; write_stepper_.write_step(*stream_); } conveyor async_io_stream::write_done() { auto caf = new_conveyor_and_feeder(); write_stepper_.write_done = std::move(caf.feeder); return std::move(caf.conveyor); } string_network_address::string_network_address(const std::string &address, uint16_t port) : address_value_{address}, port_value_{port} {} const std::string &string_network_address::address() const { return address_value_; } uint16_t string_network_address::port() const { return port_value_; } } // namespace saw