diff options
-rw-r--r-- | modules/io/c++/io_unix.cpp | 30 | ||||
-rw-r--r-- | modules/io_codec/c++/io_peer.hpp | 5 | ||||
-rw-r--r-- | modules/io_codec/c++/io_peer.tmpl.hpp | 66 | ||||
-rw-r--r-- | modules/io_codec/examples/peer_echo_client.cpp | 4 | ||||
-rw-r--r-- | modules/io_codec/examples/peer_echo_server.cpp | 3 |
5 files changed, 45 insertions, 63 deletions
diff --git a/modules/io/c++/io_unix.cpp b/modules/io/c++/io_unix.cpp index 5a8da3a..2c6cf00 100644 --- a/modules/io/c++/io_unix.cpp +++ b/modules/io/c++/io_unix.cpp @@ -738,6 +738,10 @@ own<server> unix_network::listen(network_address &addr) { return heap<unix_server>(event_port_, fd, 0); } +}} +#include <iostream> +namespace saw { namespace unix { + conveyor<own<io_stream>> unix_network::connect(network_address &addr) { auto unix_addr_storage = translate_network_address_to_unix_network_address(addr); @@ -762,6 +766,7 @@ conveyor<own<io_stream>> unix_network::connect(network_address &addr) { socket_address &addr_iter = address.unix_address(i); int status = ::connect(fd, addr_iter.get_raw(), addr_iter.get_raw_length()); + if (status < 0) { int error = errno; /* @@ -772,16 +777,21 @@ conveyor<own<io_stream>> unix_network::connect(network_address &addr) { /// @todo Add limit node when implemented if (error == EINPROGRESS) { - /* - Conveyor<void> write_ready = io_stream->writeReady(); - return write_ready.then( - [ios{std::move(io_stream)}]() mutable { - ios->write_ready = nullptr; - return std::move(ios); - }); - */ - success = true; - break; + conveyor<void> write_rdy = io_str->write_ready(); + + return write_rdy.then([ios = std::move(io_str)] () mutable -> error_or<own<io_stream>> { + + if(!ios){ + return make_error<err::invalid_state>("Limit node invalidated"); + } + own<io_stream> mov_ios = std::move(ios); + /** + * This guarantees the old async pipe to not be used anymore + */ + mov_ios->write_ready(); + + return std::move(mov_ios); + }); } else if (error != EINTR) { /// @todo Push error message from return conveyor<own<io_stream>>{make_error<err::disconnected>()}; diff --git a/modules/io_codec/c++/io_peer.hpp b/modules/io_codec/c++/io_peer.hpp index f8b986f..bd0af69 100644 --- a/modules/io_codec/c++/io_peer.hpp +++ b/modules/io_codec/c++/io_peer.hpp @@ -45,7 +45,7 @@ public: */ conveyor_feeder<data<Outgoing, ContentEncoding>> &feeder(); - conveyor<void> on_read_disconnected(); + conveyor<void> on_disconnected(); private: /// @unimplemented @@ -88,6 +88,9 @@ private: conveyor_sink sink_write_; peer_conveyor_feeder conveyor_feeder_; + + conveyor_sink io_read_disconnected_; + own<conveyor_feeder<void>> disconnect_feeder_ = nullptr; }; /** diff --git a/modules/io_codec/c++/io_peer.tmpl.hpp b/modules/io_codec/c++/io_peer.tmpl.hpp index 967a8ab..0322631 100644 --- a/modules/io_codec/c++/io_peer.tmpl.hpp +++ b/modules/io_codec/c++/io_peer.tmpl.hpp @@ -29,29 +29,8 @@ streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding, io_stream_->read_done() .then([this](size_t bytes) -> error_or<void> { - std::cout<<"Read done start: "<<bytes<<std::endl; in_buffer_.write_advance(bytes); - std::cout<<"buff state: "; - std::cout<<in_buffer_.read_composite_length(); - std::cout<<" "; - std::cout<<in_buffer_.read_segment_length(); - std::cout<<" "; - std::cout<<in_buffer_.write_composite_length(); - std::cout<<" "; - std::cout<<in_buffer_.write_segment_length(); - std::cout<<std::endl; - - std::cout<<"buff state: "; - std::cout<<out_buffer_.read_composite_length(); - std::cout<<" "; - std::cout<<out_buffer_.read_segment_length(); - std::cout<<" "; - std::cout<<out_buffer_.write_composite_length(); - std::cout<<" "; - std::cout<<out_buffer_.write_segment_length(); - std::cout<<std::endl; - if (in_buffer_.write_segment_length() == 0) { return make_error<err::critical>("Message too long"); } @@ -82,26 +61,6 @@ streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding, in_buffer_.read_advance(len_val); in_buff->write_advance(len_val); - std::cout<<"read buff state: "; - std::cout<<in_buffer_.read_composite_length(); - std::cout<<" "; - std::cout<<in_buffer_.read_segment_length(); - std::cout<<" "; - std::cout<<in_buffer_.write_composite_length(); - std::cout<<" "; - std::cout<<in_buffer_.write_segment_length(); - std::cout<<std::endl; - - std::cout<<"write buff state: "; - std::cout<<out_buffer_.read_composite_length(); - std::cout<<" "; - std::cout<<out_buffer_.read_segment_length(); - std::cout<<" "; - std::cout<<out_buffer_.write_composite_length(); - std::cout<<" "; - std::cout<<out_buffer_.write_segment_length(); - std::cout<<std::endl; - data<Incoming, ContentEncoding> in_data{std::move(in_buff)}; incoming_feeder_->feed(std::move(in_data)); } @@ -123,7 +82,12 @@ streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding, } return void_t{}; - }) + },[this](error err){ + if(disconnect_feeder_){ + disconnect_feeder_->feed(); + } + return err; + }) .sink() }, conveyor_feeder_{ @@ -139,10 +103,8 @@ template <typename Incoming, typename Outgoing, typename TransportEncoding, error_or<void> streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding, BufferT>::send(data<Outgoing, ContentEncoding> msg) { - std::cout<<"O: "<<out_buffer_.read_segment_length()<<std::endl; bool restart_write = (out_buffer_.read_segment_length() == 0u); - std::cout<<"A: "<<out_buffer_.read_segment_length()<<std::endl; auto& msg_buff = msg.get_buffer(); { @@ -153,7 +115,6 @@ error_or<void> streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentE } // auto& len_val = eov.get_value(); } - std::cout<<"B"<<std::endl; auto eov = out_buffer_.write_from(msg_buff); if (eov.is_error()) { @@ -162,14 +123,11 @@ error_or<void> streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentE } auto& len_val = eov.get_value(); out_buffer_.write_advance(len_val); - std::cout<<"C"<<std::endl; if (restart_write && out_buffer_.read_segment_length() > 0u) { - std::cout<<"D"<<std::endl; io_stream_->write(&out_buffer_.read(), out_buffer_.read_segment_length()); } - std::cout<<"E"<<std::endl; return void_t{}; } @@ -178,8 +136,16 @@ template <typename Incoming, typename Outgoing, typename TransportEncoding, type typename BufferT> conveyor<void> streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding, - BufferT>::on_read_disconnected() { - return io_stream_->on_read_disconnected(); + BufferT>::on_disconnected() { + io_read_disconnected_ = io_stream_->on_read_disconnected().then([this](){ + if(disconnect_feeder_){ + disconnect_feeder_->feed(); + } + }).sink(); + + auto caf = new_conveyor_and_feeder<void>(); + disconnect_feeder_ = std::move(caf.feeder); + return std::move(caf.conveyor); } template <typename Incoming, typename Outgoing, typename TransportEncoding, typename ContentEncoding, typename BufferT> diff --git a/modules/io_codec/examples/peer_echo_client.cpp b/modules/io_codec/examples/peer_echo_client.cpp index d923bc7..3ae67ca 100644 --- a/modules/io_codec/examples/peer_echo_client.cpp +++ b/modules/io_codec/examples/peer_echo_client.cpp @@ -76,7 +76,9 @@ int main(){ }).detach(); auto peer_str = echo_peer_stream_p.first.get(); - peer_str->on_read_disconnected().attach(std::move(echo_peer_stream_p.first)).then([]() -> error_or<void>{ + peer_str->on_disconnected().attach(std::move(echo_peer_stream_p.first)).then([&]() -> error_or<void>{ + keep_running = false; + std::cout<<"disconnect"<<std::endl; return make_error<err::critical>("Destroy pipeline on purpose :>"); }).detach(); diff --git a/modules/io_codec/examples/peer_echo_server.cpp b/modules/io_codec/examples/peer_echo_server.cpp index 87bf721..d579622 100644 --- a/modules/io_codec/examples/peer_echo_server.cpp +++ b/modules/io_codec/examples/peer_echo_server.cpp @@ -86,7 +86,8 @@ int main(){ return make_void(); }).detach(); - peer_str->on_read_disconnected().attach(std::move(echo_peer_stream_p.first)).then([]() -> error_or<void>{ + peer_str->on_disconnected().attach(std::move(echo_peer_stream_p.first)).then([]() -> error_or<void>{ + std::cout<<"Disconnected client"<<std::endl; return make_error<err::critical>("Destroy pipeline on purpose :>"); }).detach(); }).detach(); |