summaryrefslogtreecommitdiff
path: root/modules/io/c++/io.hpp
diff options
context:
space:
mode:
authorClaudius 'keldu' Holeksa <mail@keldu.de>2024-10-16 18:51:50 +0200
committerClaudius 'keldu' Holeksa <mail@keldu.de>2024-10-16 18:51:50 +0200
commitb048b02732cbfcfbb95bb8e16dec71aca0e977f4 (patch)
tree8f948159473f40ca42c73d75e61d8d03c3b38f24 /modules/io/c++/io.hpp
parentb10d0b0e1a30eb02777f5a02b81bf45e09749edb (diff)
Reworked abstraction for network
Diffstat (limited to 'modules/io/c++/io.hpp')
-rw-r--r--modules/io/c++/io.hpp90
1 files changed, 79 insertions, 11 deletions
diff --git a/modules/io/c++/io.hpp b/modules/io/c++/io.hpp
index 390ddcc..fccba7f 100644
--- a/modules/io/c++/io.hpp
+++ b/modules/io/c++/io.hpp
@@ -55,6 +55,7 @@ public:
/*
* Io stream
*/
+template<typename T = net::Os>
class io_stream : public input_stream, public output_stream {
public:
virtual ~io_stream() = default;
@@ -79,10 +80,11 @@ public:
virtual conveyor<size_t> write_done() = 0;
};
+template<typename T = net::Os>
class async_io_stream final : public async_input_stream,
public async_output_stream {
private:
- own<io_stream> stream_;
+ own<io_stream<T>> stream_;
conveyor_sink read_ready_;
conveyor_sink write_ready_;
@@ -92,7 +94,7 @@ private:
write_task_and_step_helper write_stepper_;
public:
- async_io_stream(own<io_stream> str);
+ async_io_stream(own<io_stream<T>> str);
SAW_FORBID_COPY(async_io_stream);
SAW_FORBID_MOVE(async_io_stream);
@@ -108,19 +110,22 @@ public:
conveyor<size_t> write_done() override;
};
+template<typename T = net::Os>
class server {
public:
virtual ~server() = default;
- virtual conveyor<own<io_stream>> accept() = 0;
+ virtual conveyor<own<io_stream<T>>> accept() = 0;
};
+template<typename T = net::Os>
class network_address;
/**
* Datagram class. Bound to a local address it is able to receive inbound
* datagram messages and send them as well as long as an address is provided as
* well
*/
+template<typename T = net::Os>
class datagram {
public:
virtual ~datagram() = default;
@@ -129,13 +134,14 @@ public:
virtual conveyor<void> read_ready() = 0;
virtual error_or<size_t> write(const void *buffer, size_t length,
- network_address &dest) = 0;
+ network_address<T> &dest) = 0;
virtual conveyor<void> write_ready() = 0;
};
class os_network_address;
class string_network_address;
+template<typename T>
class network_address {
public:
using child_variant =
@@ -149,14 +155,14 @@ public:
virtual uint16_t port() const = 0;
};
-class os_network_address : public network_address {
+class os_network_address : public network_address<net::Os> {
public:
virtual ~os_network_address() = default;
network_address::child_variant representation() override { return this; }
};
-class string_network_address final : public network_address {
+class string_network_address final : public network_address<net::Os> {
private:
std::string address_value_;
uint16_t port_value_;
@@ -178,10 +184,10 @@ public:
/**
* Resolve the provided string and uint16 to the preferred storage method
*/
- virtual conveyor<own<network_address>>
+ virtual conveyor<own<network_address<T>>>
resolve_address(const std::string &addr, uint16_t port_hint = 0) = 0;
- virtual error_or<own<network_address>>
+ virtual error_or<own<network_address<T>>>
parse_address(const std::string &addr, uint16_t port_hint = 0) = 0;
/**
@@ -195,17 +201,17 @@ public:
/**
* Set up a listener on this address
*/
- virtual own<server> listen(network_address &bind_addr) = 0;
+ virtual own<server<T>> listen(network_address<T> &bind_addr) = 0;
/**
* Connect to a remote address
*/
- virtual conveyor<own<io_stream>> connect(network_address &address) = 0;
+ virtual conveyor<own<io_stream<T>>> connect(network_address<T> &address) = 0;
/**
* Bind a datagram socket at this address.
*/
- virtual own<datagram> datagram(network_address &address) = 0;
+ virtual own<datagram<T>> bind_datagram(network_address<T> &address) = 0;
};
class io_provider {
@@ -222,6 +228,68 @@ struct async_io_context {
event_loop &event_loop;
event_port &event_port;
};
+}
+
+namespace saw {
+template<typename T>
+async_io_stream<T>::async_io_stream(own<io_stream<T>> str)
+ : stream_{std::move(str)},
+ read_ready_{stream_->read_ready()
+ .then([this]() { read_stepper_.read_step(*stream_); })
+ .sink()},
+ write_ready_{stream_->write_ready()
+ .then([this]() { write_stepper_.write_step(*stream_); })
+ .sink()},
+ read_disconnected_{stream_->on_read_disconnected()
+ .then([this]() {
+ if (read_stepper_.on_read_disconnect) {
+ read_stepper_.on_read_disconnect->feed();
+ }
+ })
+ .sink()} {}
+
+template<typename T>
+void async_io_stream<T>::read(void *buffer, size_t min_length, size_t max_length) {
+ SAW_ASSERT(buffer && max_length >= min_length && min_length > 0) { return; }
+
+ SAW_ASSERT(!read_stepper_.read_task.has_value()) { return; }
+
+ read_stepper_.read_task = read_task_and_step_helper::read_io_task{
+ buffer, min_length, max_length, 0};
+ read_stepper_.read_step(*stream_);
+}
+
+template<typename T>
+conveyor<size_t> async_io_stream<T>::read_done() {
+ auto caf = new_conveyor_and_feeder<size_t>();
+ read_stepper_.read_done = std::move(caf.feeder);
+ return std::move(caf.conveyor);
+}
+
+template<typename T>
+conveyor<void> async_io_stream<T>::on_read_disconnected() {
+ auto caf = new_conveyor_and_feeder<void>();
+ read_stepper_.on_read_disconnect = std::move(caf.feeder);
+ return std::move(caf.conveyor);
+}
+
+template<typename T>
+void async_io_stream<T>::write(const void *buffer, size_t length) {
+ SAW_ASSERT(buffer && length > 0) { return; }
+
+ SAW_ASSERT(!write_stepper_.write_task.has_value()) { return; }
+
+ write_stepper_.write_task =
+ write_task_and_step_helper::write_io_task{buffer, length, 0};
+ write_stepper_.write_step(*stream_);
+}
+
+template<typename T>
+conveyor<size_t> async_io_stream<T>::write_done() {
+ auto caf = new_conveyor_and_feeder<size_t>();
+ write_stepper_.write_done = std::move(caf.feeder);
+ return std::move(caf.conveyor);
+}
error_or<async_io_context> setup_async_io();
} // namespace saw