summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorClaudius 'keldu' Holeksa <mail@keldu.de>2024-10-16 18:51:50 +0200
committerClaudius 'keldu' Holeksa <mail@keldu.de>2024-10-16 18:51:50 +0200
commitb048b02732cbfcfbb95bb8e16dec71aca0e977f4 (patch)
tree8f948159473f40ca42c73d75e61d8d03c3b38f24
parentb10d0b0e1a30eb02777f5a02b81bf45e09749edb (diff)
Reworked abstraction for network
-rw-r--r--modules/io-tls/tls.hpp12
-rw-r--r--modules/io/c++/io.cpp53
-rw-r--r--modules/io/c++/io.hpp90
-rw-r--r--modules/io/c++/io_unix.cpp58
-rw-r--r--modules/io/examples/echo_client.cpp6
-rw-r--r--modules/io/examples/echo_server.cpp10
-rw-r--r--modules/io_codec/c++/io_peer.hpp18
-rw-r--r--modules/io_codec/c++/io_peer.tmpl.hpp29
-rw-r--r--modules/io_codec/examples/peer_echo_client.cpp6
-rw-r--r--modules/io_codec/examples/peer_echo_server.cpp6
10 files changed, 157 insertions, 131 deletions
diff --git a/modules/io-tls/tls.hpp b/modules/io-tls/tls.hpp
index 93ec180..e2202f4 100644
--- a/modules/io-tls/tls.hpp
+++ b/modules/io-tls/tls.hpp
@@ -7,6 +7,11 @@
#include <variant>
namespace saw {
+namespace net {
+template<typename T = net::Os>
+struct Tls {};
+}
+
class tls;
/**
@@ -37,6 +42,11 @@ private:
options options_;
};
-error_or<own<network>> setup_tls_network(network &network);
+template<>
+class network<net::Tls> {
+};
+
+template<typename T = net::Os>
+error_or<own<network<net::Tls<T>>>> setup_tls_network(network<T> &network);
} // namespace saw
diff --git a/modules/io/c++/io.cpp b/modules/io/c++/io.cpp
index 06c9cbb..50423e1 100644
--- a/modules/io/c++/io.cpp
+++ b/modules/io/c++/io.cpp
@@ -4,59 +4,6 @@
namespace saw {
-async_io_stream::async_io_stream(own<io_stream> str)
- : stream_{std::move(str)},
- read_ready_{stream_->read_ready()
- .then([this]() { read_stepper_.read_step(*stream_); })
- .sink()},
- write_ready_{stream_->write_ready()
- .then([this]() { write_stepper_.write_step(*stream_); })
- .sink()},
- read_disconnected_{stream_->on_read_disconnected()
- .then([this]() {
- if (read_stepper_.on_read_disconnect) {
- read_stepper_.on_read_disconnect->feed();
- }
- })
- .sink()} {}
-
-void async_io_stream::read(void *buffer, size_t min_length, size_t max_length) {
- SAW_ASSERT(buffer && max_length >= min_length && min_length > 0) { return; }
-
- SAW_ASSERT(!read_stepper_.read_task.has_value()) { return; }
-
- read_stepper_.read_task = read_task_and_step_helper::read_io_task{
- buffer, min_length, max_length, 0};
- read_stepper_.read_step(*stream_);
-}
-
-conveyor<size_t> async_io_stream::read_done() {
- auto caf = new_conveyor_and_feeder<size_t>();
- read_stepper_.read_done = std::move(caf.feeder);
- return std::move(caf.conveyor);
-}
-
-conveyor<void> async_io_stream::on_read_disconnected() {
- auto caf = new_conveyor_and_feeder<void>();
- read_stepper_.on_read_disconnect = std::move(caf.feeder);
- return std::move(caf.conveyor);
-}
-
-void async_io_stream::write(const void *buffer, size_t length) {
- SAW_ASSERT(buffer && length > 0) { return; }
-
- SAW_ASSERT(!write_stepper_.write_task.has_value()) { return; }
-
- write_stepper_.write_task =
- write_task_and_step_helper::write_io_task{buffer, length, 0};
- write_stepper_.write_step(*stream_);
-}
-
-conveyor<size_t> async_io_stream::write_done() {
- auto caf = new_conveyor_and_feeder<size_t>();
- write_stepper_.write_done = std::move(caf.feeder);
- return std::move(caf.conveyor);
-}
string_network_address::string_network_address(const std::string &address,
uint16_t port)
diff --git a/modules/io/c++/io.hpp b/modules/io/c++/io.hpp
index 390ddcc..fccba7f 100644
--- a/modules/io/c++/io.hpp
+++ b/modules/io/c++/io.hpp
@@ -55,6 +55,7 @@ public:
/*
* Io stream
*/
+template<typename T = net::Os>
class io_stream : public input_stream, public output_stream {
public:
virtual ~io_stream() = default;
@@ -79,10 +80,11 @@ public:
virtual conveyor<size_t> write_done() = 0;
};
+template<typename T = net::Os>
class async_io_stream final : public async_input_stream,
public async_output_stream {
private:
- own<io_stream> stream_;
+ own<io_stream<T>> stream_;
conveyor_sink read_ready_;
conveyor_sink write_ready_;
@@ -92,7 +94,7 @@ private:
write_task_and_step_helper write_stepper_;
public:
- async_io_stream(own<io_stream> str);
+ async_io_stream(own<io_stream<T>> str);
SAW_FORBID_COPY(async_io_stream);
SAW_FORBID_MOVE(async_io_stream);
@@ -108,19 +110,22 @@ public:
conveyor<size_t> write_done() override;
};
+template<typename T = net::Os>
class server {
public:
virtual ~server() = default;
- virtual conveyor<own<io_stream>> accept() = 0;
+ virtual conveyor<own<io_stream<T>>> accept() = 0;
};
+template<typename T = net::Os>
class network_address;
/**
* Datagram class. Bound to a local address it is able to receive inbound
* datagram messages and send them as well as long as an address is provided as
* well
*/
+template<typename T = net::Os>
class datagram {
public:
virtual ~datagram() = default;
@@ -129,13 +134,14 @@ public:
virtual conveyor<void> read_ready() = 0;
virtual error_or<size_t> write(const void *buffer, size_t length,
- network_address &dest) = 0;
+ network_address<T> &dest) = 0;
virtual conveyor<void> write_ready() = 0;
};
class os_network_address;
class string_network_address;
+template<typename T>
class network_address {
public:
using child_variant =
@@ -149,14 +155,14 @@ public:
virtual uint16_t port() const = 0;
};
-class os_network_address : public network_address {
+class os_network_address : public network_address<net::Os> {
public:
virtual ~os_network_address() = default;
network_address::child_variant representation() override { return this; }
};
-class string_network_address final : public network_address {
+class string_network_address final : public network_address<net::Os> {
private:
std::string address_value_;
uint16_t port_value_;
@@ -178,10 +184,10 @@ public:
/**
* Resolve the provided string and uint16 to the preferred storage method
*/
- virtual conveyor<own<network_address>>
+ virtual conveyor<own<network_address<T>>>
resolve_address(const std::string &addr, uint16_t port_hint = 0) = 0;
- virtual error_or<own<network_address>>
+ virtual error_or<own<network_address<T>>>
parse_address(const std::string &addr, uint16_t port_hint = 0) = 0;
/**
@@ -195,17 +201,17 @@ public:
/**
* Set up a listener on this address
*/
- virtual own<server> listen(network_address &bind_addr) = 0;
+ virtual own<server<T>> listen(network_address<T> &bind_addr) = 0;
/**
* Connect to a remote address
*/
- virtual conveyor<own<io_stream>> connect(network_address &address) = 0;
+ virtual conveyor<own<io_stream<T>>> connect(network_address<T> &address) = 0;
/**
* Bind a datagram socket at this address.
*/
- virtual own<datagram> datagram(network_address &address) = 0;
+ virtual own<datagram<T>> bind_datagram(network_address<T> &address) = 0;
};
class io_provider {
@@ -222,6 +228,68 @@ struct async_io_context {
event_loop &event_loop;
event_port &event_port;
};
+}
+
+namespace saw {
+template<typename T>
+async_io_stream<T>::async_io_stream(own<io_stream<T>> str)
+ : stream_{std::move(str)},
+ read_ready_{stream_->read_ready()
+ .then([this]() { read_stepper_.read_step(*stream_); })
+ .sink()},
+ write_ready_{stream_->write_ready()
+ .then([this]() { write_stepper_.write_step(*stream_); })
+ .sink()},
+ read_disconnected_{stream_->on_read_disconnected()
+ .then([this]() {
+ if (read_stepper_.on_read_disconnect) {
+ read_stepper_.on_read_disconnect->feed();
+ }
+ })
+ .sink()} {}
+
+template<typename T>
+void async_io_stream<T>::read(void *buffer, size_t min_length, size_t max_length) {
+ SAW_ASSERT(buffer && max_length >= min_length && min_length > 0) { return; }
+
+ SAW_ASSERT(!read_stepper_.read_task.has_value()) { return; }
+
+ read_stepper_.read_task = read_task_and_step_helper::read_io_task{
+ buffer, min_length, max_length, 0};
+ read_stepper_.read_step(*stream_);
+}
+
+template<typename T>
+conveyor<size_t> async_io_stream<T>::read_done() {
+ auto caf = new_conveyor_and_feeder<size_t>();
+ read_stepper_.read_done = std::move(caf.feeder);
+ return std::move(caf.conveyor);
+}
+
+template<typename T>
+conveyor<void> async_io_stream<T>::on_read_disconnected() {
+ auto caf = new_conveyor_and_feeder<void>();
+ read_stepper_.on_read_disconnect = std::move(caf.feeder);
+ return std::move(caf.conveyor);
+}
+
+template<typename T>
+void async_io_stream<T>::write(const void *buffer, size_t length) {
+ SAW_ASSERT(buffer && length > 0) { return; }
+
+ SAW_ASSERT(!write_stepper_.write_task.has_value()) { return; }
+
+ write_stepper_.write_task =
+ write_task_and_step_helper::write_io_task{buffer, length, 0};
+ write_stepper_.write_step(*stream_);
+}
+
+template<typename T>
+conveyor<size_t> async_io_stream<T>::write_done() {
+ auto caf = new_conveyor_and_feeder<size_t>();
+ write_stepper_.write_done = std::move(caf.feeder);
+ return std::move(caf.conveyor);
+}
error_or<async_io_context> setup_async_io();
} // namespace saw
diff --git a/modules/io/c++/io_unix.cpp b/modules/io/c++/io_unix.cpp
index 73e8e3b..1d89e7d 100644
--- a/modules/io/c++/io_unix.cpp
+++ b/modules/io/c++/io_unix.cpp
@@ -264,7 +264,7 @@ public:
ssize_t unix_read(int fd, void *buffer, size_t length);
ssize_t unix_write(int fd, const void *buffer, size_t length);
-class unix_io_stream final : public io_stream, public i_fd_owner {
+class unix_io_stream final : public io_stream<net::Os>, public i_fd_owner {
private:
own<conveyor_feeder<void>> read_ready_ = nullptr;
own<conveyor_feeder<void>> on_read_disconnect_ = nullptr;
@@ -299,19 +299,19 @@ public:
void notify(uint32_t mask) override;
};
-class unix_server final : public server, public i_fd_owner {
+class unix_server final : public server<net::Os>, public i_fd_owner {
private:
- own<conveyor_feeder<own<io_stream>>> accept_feeder_ = nullptr;
+ own<conveyor_feeder<own<io_stream<net::Os>>>> accept_feeder_ = nullptr;
public:
unix_server(unix_event_port &event_port, int file_descriptor, int fd_flags);
- conveyor<own<io_stream>> accept() override;
+ conveyor<own<io_stream<net::Os>>> accept() override;
void notify(uint32_t mask) override;
};
-class unix_datagram final : public datagram, public i_fd_owner {
+class unix_datagram final : public datagram<net::Os>, public i_fd_owner {
private:
own<conveyor_feeder<void>> read_ready_ = nullptr;
own<conveyor_feeder<void>> write_ready_ = nullptr;
@@ -324,7 +324,7 @@ public:
conveyor<void> read_ready() override;
error_or<size_t> write(const void *buffer, size_t length,
- network_address &dest) override;
+ network_address<net::Os> &dest) override;
conveyor<void> write_ready() override;
void notify(uint32_t mask) override;
@@ -437,19 +437,19 @@ private:
public:
unix_network(unix_event_port &event_port);
- conveyor<own<network_address>>
+ conveyor<own<network_address<net::Os>>>
resolve_address(const std::string &address,
uint16_t port_hint = 0) override;
- error_or<own<network_address>>
+ error_or<own<network_address<net::Os>>>
parse_address(const std::string& address,
uint16_t port_hint = 0) override;
- own<server> listen(network_address &addr) override;
+ own<server<net::Os>> listen(network_address<net::Os> &addr) override;
- conveyor<own<io_stream>> connect(network_address &addr) override;
+ conveyor<own<io_stream<net::Os>>> connect(network_address<net::Os> &addr) override;
- own<class datagram> datagram(network_address &addr) override;
+ own<datagram<net::Os>> bind_datagram(network_address<net::Os> &addr) override;
};
class unix_io_provider final : public io_provider {
@@ -462,7 +462,7 @@ private:
public:
unix_io_provider(unix_event_port &port_ref, own<event_port> port);
- class network<net::Os> &get_network() override;
+ network<net::Os> &get_network() override;
own<input_stream> wrap_input_fd(int fd) override;
@@ -564,8 +564,8 @@ unix_server::unix_server(unix_event_port &event_port, int file_descriptor,
int fd_flags)
: i_fd_owner{event_port, file_descriptor, fd_flags, EPOLLIN} {}
-conveyor<own<io_stream>> unix_server::accept() {
- auto caf = new_conveyor_and_feeder<own<io_stream>>();
+conveyor<own<io_stream<net::Os>>> unix_server::accept() {
+ auto caf = new_conveyor_and_feeder<own<io_stream<net::Os>>>();
accept_feeder_ = std::move(caf.feeder);
return std::move(caf.conveyor);
}
@@ -624,7 +624,7 @@ conveyor<void> unix_datagram::read_ready() {
}
error_or<size_t> unix_datagram::write(const void *buffer, size_t length,
- network_address &dest) {
+ network_address<net::Os> &dest) {
unix_network_address &unix_dest = static_cast<unix_network_address &>(dest);
socket_address &sock_addr = unix_dest.unix_address();
socklen_t sock_addr_length = sock_addr.get_raw_length();
@@ -664,7 +664,7 @@ bool begins_with(const std::string_view &viewed,
}
std::variant<unix_network_address, unix_network_address *>
-translate_network_address_to_unix_network_address(network_address &addr) {
+translate_network_address_to_unix_network_address(network_address<net::Os> &addr) {
auto addr_variant = addr.representation();
std::variant<unix_network_address, unix_network_address *> os_addr =
std::visit(
@@ -705,7 +705,7 @@ unix_network_address &translate_to_unix_address_ref(
} // namespace
-own<server> unix_network::listen(network_address &addr) {
+own<server<net::Os>> unix_network::listen(network_address<net::Os> &addr) {
auto unix_addr_storage =
translate_network_address_to_unix_network_address(addr);
unix_network_address &address =
@@ -743,7 +743,7 @@ own<server> unix_network::listen(network_address &addr) {
#include <iostream>
namespace saw { namespace unix {
-conveyor<own<io_stream>> unix_network::connect(network_address &addr) {
+conveyor<own<io_stream<net::Os>>> unix_network::connect(network_address<net::Os> &addr) {
auto unix_addr_storage =
translate_network_address_to_unix_network_address(addr);
unix_network_address &address =
@@ -751,12 +751,12 @@ conveyor<own<io_stream>> unix_network::connect(network_address &addr) {
assert(address.unix_address_size() > 0);
if (address.unix_address_size() == 0) {
- return conveyor<own<io_stream>>{make_error<err::critical>()};
+ return conveyor<own<io_stream<net::Os>>>{make_error<err::critical>()};
}
int fd = address.unix_address(0).socket(SOCK_STREAM);
if (fd < 0) {
- return conveyor<own<io_stream>>{make_error<err::disconnected>()};
+ return conveyor<own<io_stream<net::Os>>>{make_error<err::disconnected>()};
}
own<unix_io_stream> io_str =
@@ -780,12 +780,12 @@ conveyor<own<io_stream>> unix_network::connect(network_address &addr) {
if (error == EINPROGRESS) {
conveyor<void> write_rdy = io_str->write_ready();
- return write_rdy.then([ios = std::move(io_str)] () mutable -> error_or<own<io_stream>> {
+ return write_rdy.then([ios = std::move(io_str)] () mutable -> error_or<own<io_stream<net::Os>>> {
if(!ios){
return make_error<err::invalid_state>("Limit node invalidated");
}
- own<io_stream> mov_ios = std::move(ios);
+ own<io_stream<net::Os>> mov_ios = std::move(ios);
/**
* This guarantees the old async pipe to not be used anymore
*/
@@ -795,7 +795,7 @@ conveyor<own<io_stream>> unix_network::connect(network_address &addr) {
});
} else if (error != EINTR) {
/// @todo Push error message from
- return conveyor<own<io_stream>>{make_error<err::disconnected>()};
+ return conveyor<own<io_stream<net::Os>>>{make_error<err::disconnected>()};
}
} else {
success = true;
@@ -804,13 +804,13 @@ conveyor<own<io_stream>> unix_network::connect(network_address &addr) {
}
if (!success) {
- return conveyor<own<io_stream>>{make_error<err::disconnected>()};
+ return conveyor<own<io_stream<net::Os>>>{make_error<err::disconnected>()};
}
- return conveyor<own<io_stream>>{std::move(io_str)};
+ return conveyor<own<io_stream<net::Os>>>{std::move(io_str)};
}
-own<datagram> unix_network::datagram(network_address &addr) {
+own<datagram<net::Os>> unix_network::bind_datagram(network_address<net::Os> &addr) {
auto unix_addr_storage =
translate_network_address_to_unix_network_address(addr);
unix_network_address &address =
@@ -853,7 +853,7 @@ size_t unix_network_address::unix_address_size() const {
unix_network::unix_network(unix_event_port &event) : event_port_{event} {}
-conveyor<own<network_address>>
+conveyor<own<network_address<net::Os>>>
unix_network::resolve_address(const std::string &path, uint16_t port_hint) {
std::string_view addr_view{path};
{
@@ -866,11 +866,11 @@ unix_network::resolve_address(const std::string &path, uint16_t port_hint) {
std::vector<socket_address> addresses =
socket_address::resolve(addr_view, port_hint);
- return conveyor<own<network_address>>{
+ return conveyor<own<network_address<net::Os>>>{
heap<unix_network_address>(path, port_hint, std::move(addresses))};
}
-error_or<own<network_address>>
+error_or<own<network_address<net::Os>>>
unix_network::parse_address(const std::string& path, uint16_t port_hint){
std::string_view addr_view{path};
{
diff --git a/modules/io/examples/echo_client.cpp b/modules/io/examples/echo_client.cpp
index e827ce0..69d91a2 100644
--- a/modules/io/examples/echo_client.cpp
+++ b/modules/io/examples/echo_client.cpp
@@ -30,8 +30,8 @@ int main(){
keep_running = false;
}).detach();
- saw::own<saw::network_address> net_addr = nullptr;
- saw::own<saw::async_io_stream> async_rmt = nullptr;
+ saw::own<saw::network_address<saw::net::Os>> net_addr = nullptr;
+ saw::own<saw::async_io_stream<saw::net::Os>> async_rmt = nullptr;
std::array<uint8_t, 32> read_data;
uint64_t read_bytes = 0;
@@ -39,7 +39,7 @@ int main(){
network.resolve_address(saw::echo_address, saw::echo_port).then([&](auto addr){
net_addr = std::move(addr);
network.connect(*net_addr).then([&](auto rmt_srv){
- async_rmt = saw::heap<saw::async_io_stream>(std::move(rmt_srv));
+ async_rmt = saw::heap<saw::async_io_stream<saw::net::Os>>(std::move(rmt_srv));
async_rmt->write(&message_content[0], message_content.size());
async_rmt->read(&read_data[0], message_content.size(), read_data.size()-1);
diff --git a/modules/io/examples/echo_server.cpp b/modules/io/examples/echo_server.cpp
index 4336048..0b0ea1b 100644
--- a/modules/io/examples/echo_server.cpp
+++ b/modules/io/examples/echo_server.cpp
@@ -4,7 +4,7 @@
#include "echo.hpp"
-saw::error_or<void> handle_echo_write(saw::io_stream& rmt_clt, saw::message& state, uint64_t tbw){
+saw::error_or<void> handle_echo_write(saw::io_stream<saw::net::Os>& rmt_clt, saw::message& state, uint64_t tbw){
auto eov = rmt_clt.write(&state.data[state.already_written], tbw);
if(eov.is_error()){
return std::move(eov.get_error());
@@ -19,7 +19,7 @@ saw::error_or<void> handle_echo_write(saw::io_stream& rmt_clt, saw::message& sta
return saw::void_t{};
}
-void handle_echo_message(saw::io_stream& rmt_clt, bool& keep_running, saw::message& state){
+void handle_echo_message(saw::io_stream<saw::net::Os>& rmt_clt, bool& keep_running, saw::message& state){
rmt_clt.read_ready().then([&](){
for(;;){
uint64_t tbr = state.data.size() < state.already_read ? 0: state.data.size() - state.already_read;
@@ -113,9 +113,9 @@ int main(){
}).detach();
auto& network = aio.io->get_network();
- saw::own<saw::network_address> addr = nullptr;
- saw::own<saw::server> srv = nullptr;
- saw::own<saw::io_stream> remote_client = nullptr;
+ saw::own<saw::network_address<saw::net::Os>> addr = nullptr;
+ saw::own<saw::server<saw::net::Os>> srv = nullptr;
+ saw::own<saw::io_stream<saw::net::Os>> remote_client = nullptr;
saw::message msg_state;
diff --git a/modules/io_codec/c++/io_peer.hpp b/modules/io_codec/c++/io_peer.hpp
index 0cb3959..d29c7b8 100644
--- a/modules/io_codec/c++/io_peer.hpp
+++ b/modules/io_codec/c++/io_peer.hpp
@@ -10,6 +10,7 @@ namespace saw {
template <typename Incoming, typename Outgoing,
typename TransportEncoding, typename ContentEncoding,
+ typename Net = net::Os,
typename BufferT = ring_buffer>
class streaming_io_peer {
private:
@@ -20,14 +21,14 @@ public:
*/
streaming_io_peer(
own<conveyor_feeder<data<Incoming, ContentEncoding>>> feed,
- own<async_io_stream> stream, transport<TransportEncoding> in_codec);
+ own<async_io_stream<Net>> stream, transport<TransportEncoding> in_codec);
/**
* Constructor with mostly default assignements
*/
streaming_io_peer(
own<conveyor_feeder<data<Incoming, ContentEncoding>>> feed,
- own<async_io_stream> stream);
+ own<async_io_stream<Net>> stream);
/**
* Deleted copy and move constructors
@@ -54,8 +55,8 @@ private:
: public conveyor_feeder<data<Outgoing, ContentEncoding>> {
public:
peer_conveyor_feeder(
- streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding, BufferT> &peer_)
- : peer_{peer_} {}
+ streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding, Net, BufferT> &peer__)
+ : peer_{peer__} {}
void feed(data<Outgoing, ContentEncoding> &&data_) override {
(void)data_;
@@ -69,7 +70,7 @@ private:
error_or<void> swap(conveyor<data<Outgoing, ContentEncoding>> &&) noexcept override { return make_error<err::not_implemented>();}
private:
- streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding,
+ streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding, Net,
BufferT> &peer_;
};
@@ -77,7 +78,7 @@ private:
own<conveyor_feeder<data<Incoming, ContentEncoding>>>
incoming_feeder_ = nullptr;
- own<async_io_stream> io_stream_;
+ own<async_io_stream<Net>> io_stream_;
transport<TransportEncoding> in_codec_;
@@ -99,10 +100,11 @@ private:
*/
template <typename Incoming, typename Outgoing,
typename TransportEncoding, typename ContentEncoding,
+ typename Net = net::Os,
typename BufferT = ring_buffer>
-std::pair<own<streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding, BufferT>>,
+std::pair<own<streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding, Net, BufferT>>,
conveyor<data<Incoming, ContentEncoding>>>
-new_streaming_io_peer(own<async_io_stream> stream);
+new_streaming_io_peer(own<async_io_stream<Net>> stream);
} // namespace saw
diff --git a/modules/io_codec/c++/io_peer.tmpl.hpp b/modules/io_codec/c++/io_peer.tmpl.hpp
index 0322631..e046f92 100644
--- a/modules/io_codec/c++/io_peer.tmpl.hpp
+++ b/modules/io_codec/c++/io_peer.tmpl.hpp
@@ -1,25 +1,24 @@
namespace saw {
}
-#include <iostream>
namespace saw {
-template <typename Incoming, typename Outgoing, typename TransportEncoding, typename ContentEncoding,
+template <typename Incoming, typename Outgoing, typename TransportEncoding, typename ContentEncoding, typename Net,
typename BufferT>
-streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding,
+streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding, Net,
BufferT>::
streaming_io_peer(
own<conveyor_feeder<data<Incoming, ContentEncoding>>> feed,
- own<async_io_stream> str)
+ own<async_io_stream<Net>> str)
: streaming_io_peer{std::move(feed), std::move(str), {}} {}
-template <typename Incoming, typename Outgoing, typename TransportEncoding, typename ContentEncoding, typename BufferT>
-streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding,
+template <typename Incoming, typename Outgoing, typename TransportEncoding, typename ContentEncoding, typename Net, typename BufferT>
+streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding, Net,
BufferT>::
streaming_io_peer(
own<conveyor_feeder<data<Incoming, ContentEncoding>>> feed,
- own<async_io_stream> stream, transport<TransportEncoding> in_codec)
+ own<async_io_stream<Net>> stream, transport<TransportEncoding> in_codec)
: incoming_feeder_{std::move(feed)},
io_stream_{std::move(stream)},
in_codec_{std::move(in_codec)},
@@ -99,8 +98,8 @@ streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding,
template <typename Incoming, typename Outgoing, typename TransportEncoding,
- typename ContentEncoding, typename BufferT>
-error_or<void> streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding,
+ typename ContentEncoding, typename Net, typename BufferT>
+error_or<void> streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding, Net,
BufferT>::send(data<Outgoing, ContentEncoding>
msg) {
bool restart_write = (out_buffer_.read_segment_length() == 0u);
@@ -132,10 +131,10 @@ error_or<void> streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentE
return void_t{};
}
-template <typename Incoming, typename Outgoing, typename TransportEncoding, typename ContentEncoding,
+template <typename Incoming, typename Outgoing, typename TransportEncoding, typename ContentEncoding, typename Net,
typename BufferT>
conveyor<void>
-streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding,
+streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding, Net,
BufferT>::on_disconnected() {
io_read_disconnected_ = io_stream_->on_read_disconnected().then([this](){
if(disconnect_feeder_){
@@ -148,14 +147,14 @@ streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding,
return std::move(caf.conveyor);
}
-template <typename Incoming, typename Outgoing, typename TransportEncoding, typename ContentEncoding, typename BufferT>
-std::pair<own<streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding, BufferT>>,
+template <typename Incoming, typename Outgoing, typename TransportEncoding, typename ContentEncoding, typename Net, typename BufferT>
+std::pair<own<streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding, Net, BufferT>>,
conveyor<data<Incoming,ContentEncoding>>>
-new_streaming_io_peer(own<async_io_stream> stream) {
+new_streaming_io_peer(own<async_io_stream<Net>> stream) {
auto caf =
new_conveyor_and_feeder<data<Incoming, ContentEncoding>>();
- return {heap<streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding,BufferT>>(
+ return {heap<streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding,Net,BufferT>>(
std::move(caf.feeder), std::move(stream)),
std::move(caf.conveyor)};
}
diff --git a/modules/io_codec/examples/peer_echo_client.cpp b/modules/io_codec/examples/peer_echo_client.cpp
index 942deed..b610372 100644
--- a/modules/io_codec/examples/peer_echo_client.cpp
+++ b/modules/io_codec/examples/peer_echo_client.cpp
@@ -69,13 +69,13 @@ int main(int argc, char** argv){
codec<sch::Echo, encode::KelSimple> simple_codec;
- network.connect(*addr).then([&](saw::own<saw::io_stream> client){
+ network.connect(*addr).then([&](saw::own<saw::io_stream<saw::net::Os>> client){
if(!client){
return;
}
- auto echo_stream = saw::heap<saw::async_io_stream>(std::move(client));
- auto echo_peer_stream_p = saw::new_streaming_io_peer<sch::Echo, sch::Echo, trans::FixedLength<8u>, encode::KelSimple, ring_buffer>(std::move(echo_stream));
+ auto echo_stream = saw::heap<saw::async_io_stream<saw::net::Os>>(std::move(client));
+ auto echo_peer_stream_p = saw::new_streaming_io_peer<sch::Echo, sch::Echo, trans::FixedLength<8u>, encode::KelSimple>(std::move(echo_stream));
std::cout<<"Connected"<<std::endl;
diff --git a/modules/io_codec/examples/peer_echo_server.cpp b/modules/io_codec/examples/peer_echo_server.cpp
index 4d201c0..804d9ad 100644
--- a/modules/io_codec/examples/peer_echo_server.cpp
+++ b/modules/io_codec/examples/peer_echo_server.cpp
@@ -50,13 +50,13 @@ int main(){
return -2;
}
- echo_srv->accept().then([&](saw::own<saw::io_stream> client){
+ echo_srv->accept().then([&](saw::own<saw::io_stream<saw::net::Os>> client){
if(!client){
keep_running = false;
return;
}
- auto echo_stream = saw::heap<saw::async_io_stream>(std::move(client));
- auto echo_peer_stream_p = saw::new_streaming_io_peer<sch::Echo, sch::Echo, trans::FixedLength<8u>, encode::KelSimple, ring_buffer>(std::move(echo_stream));
+ auto echo_stream = saw::heap<saw::async_io_stream<saw::net::Os>>(std::move(client));
+ auto echo_peer_stream_p = saw::new_streaming_io_peer<sch::Echo, sch::Echo, trans::FixedLength<8u>, encode::KelSimple>(std::move(echo_stream));
std::cout<<"Connected client"<<std::endl;
auto peer_str = echo_peer_stream_p.first.get();