summaryrefslogtreecommitdiff
path: root/modules
diff options
context:
space:
mode:
Diffstat (limited to 'modules')
-rw-r--r--modules/io/c++/io_unix.cpp30
-rw-r--r--modules/io_codec/c++/io_peer.hpp5
-rw-r--r--modules/io_codec/c++/io_peer.tmpl.hpp66
-rw-r--r--modules/io_codec/examples/peer_echo_client.cpp4
-rw-r--r--modules/io_codec/examples/peer_echo_server.cpp3
5 files changed, 45 insertions, 63 deletions
diff --git a/modules/io/c++/io_unix.cpp b/modules/io/c++/io_unix.cpp
index 5a8da3a..2c6cf00 100644
--- a/modules/io/c++/io_unix.cpp
+++ b/modules/io/c++/io_unix.cpp
@@ -738,6 +738,10 @@ own<server> unix_network::listen(network_address &addr) {
return heap<unix_server>(event_port_, fd, 0);
}
+}}
+#include <iostream>
+namespace saw { namespace unix {
+
conveyor<own<io_stream>> unix_network::connect(network_address &addr) {
auto unix_addr_storage =
translate_network_address_to_unix_network_address(addr);
@@ -762,6 +766,7 @@ conveyor<own<io_stream>> unix_network::connect(network_address &addr) {
socket_address &addr_iter = address.unix_address(i);
int status =
::connect(fd, addr_iter.get_raw(), addr_iter.get_raw_length());
+
if (status < 0) {
int error = errno;
/*
@@ -772,16 +777,21 @@ conveyor<own<io_stream>> unix_network::connect(network_address &addr) {
/// @todo Add limit node when implemented
if (error == EINPROGRESS) {
- /*
- Conveyor<void> write_ready = io_stream->writeReady();
- return write_ready.then(
- [ios{std::move(io_stream)}]() mutable {
- ios->write_ready = nullptr;
- return std::move(ios);
- });
- */
- success = true;
- break;
+ conveyor<void> write_rdy = io_str->write_ready();
+
+ return write_rdy.then([ios = std::move(io_str)] () mutable -> error_or<own<io_stream>> {
+
+ if(!ios){
+ return make_error<err::invalid_state>("Limit node invalidated");
+ }
+ own<io_stream> mov_ios = std::move(ios);
+ /**
+ * This guarantees the old async pipe to not be used anymore
+ */
+ mov_ios->write_ready();
+
+ return std::move(mov_ios);
+ });
} else if (error != EINTR) {
/// @todo Push error message from
return conveyor<own<io_stream>>{make_error<err::disconnected>()};
diff --git a/modules/io_codec/c++/io_peer.hpp b/modules/io_codec/c++/io_peer.hpp
index f8b986f..bd0af69 100644
--- a/modules/io_codec/c++/io_peer.hpp
+++ b/modules/io_codec/c++/io_peer.hpp
@@ -45,7 +45,7 @@ public:
*/
conveyor_feeder<data<Outgoing, ContentEncoding>> &feeder();
- conveyor<void> on_read_disconnected();
+ conveyor<void> on_disconnected();
private:
/// @unimplemented
@@ -88,6 +88,9 @@ private:
conveyor_sink sink_write_;
peer_conveyor_feeder conveyor_feeder_;
+
+ conveyor_sink io_read_disconnected_;
+ own<conveyor_feeder<void>> disconnect_feeder_ = nullptr;
};
/**
diff --git a/modules/io_codec/c++/io_peer.tmpl.hpp b/modules/io_codec/c++/io_peer.tmpl.hpp
index 967a8ab..0322631 100644
--- a/modules/io_codec/c++/io_peer.tmpl.hpp
+++ b/modules/io_codec/c++/io_peer.tmpl.hpp
@@ -29,29 +29,8 @@ streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding,
io_stream_->read_done()
.then([this](size_t bytes) -> error_or<void> {
- std::cout<<"Read done start: "<<bytes<<std::endl;
in_buffer_.write_advance(bytes);
- std::cout<<"buff state: ";
- std::cout<<in_buffer_.read_composite_length();
- std::cout<<" ";
- std::cout<<in_buffer_.read_segment_length();
- std::cout<<" ";
- std::cout<<in_buffer_.write_composite_length();
- std::cout<<" ";
- std::cout<<in_buffer_.write_segment_length();
- std::cout<<std::endl;
-
- std::cout<<"buff state: ";
- std::cout<<out_buffer_.read_composite_length();
- std::cout<<" ";
- std::cout<<out_buffer_.read_segment_length();
- std::cout<<" ";
- std::cout<<out_buffer_.write_composite_length();
- std::cout<<" ";
- std::cout<<out_buffer_.write_segment_length();
- std::cout<<std::endl;
-
if (in_buffer_.write_segment_length() == 0) {
return make_error<err::critical>("Message too long");
}
@@ -82,26 +61,6 @@ streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding,
in_buffer_.read_advance(len_val);
in_buff->write_advance(len_val);
- std::cout<<"read buff state: ";
- std::cout<<in_buffer_.read_composite_length();
- std::cout<<" ";
- std::cout<<in_buffer_.read_segment_length();
- std::cout<<" ";
- std::cout<<in_buffer_.write_composite_length();
- std::cout<<" ";
- std::cout<<in_buffer_.write_segment_length();
- std::cout<<std::endl;
-
- std::cout<<"write buff state: ";
- std::cout<<out_buffer_.read_composite_length();
- std::cout<<" ";
- std::cout<<out_buffer_.read_segment_length();
- std::cout<<" ";
- std::cout<<out_buffer_.write_composite_length();
- std::cout<<" ";
- std::cout<<out_buffer_.write_segment_length();
- std::cout<<std::endl;
-
data<Incoming, ContentEncoding> in_data{std::move(in_buff)};
incoming_feeder_->feed(std::move(in_data));
}
@@ -123,7 +82,12 @@ streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding,
}
return void_t{};
- })
+ },[this](error err){
+ if(disconnect_feeder_){
+ disconnect_feeder_->feed();
+ }
+ return err;
+ })
.sink()
},
conveyor_feeder_{
@@ -139,10 +103,8 @@ template <typename Incoming, typename Outgoing, typename TransportEncoding,
error_or<void> streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding,
BufferT>::send(data<Outgoing, ContentEncoding>
msg) {
- std::cout<<"O: "<<out_buffer_.read_segment_length()<<std::endl;
bool restart_write = (out_buffer_.read_segment_length() == 0u);
- std::cout<<"A: "<<out_buffer_.read_segment_length()<<std::endl;
auto& msg_buff = msg.get_buffer();
{
@@ -153,7 +115,6 @@ error_or<void> streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentE
}
// auto& len_val = eov.get_value();
}
- std::cout<<"B"<<std::endl;
auto eov = out_buffer_.write_from(msg_buff);
if (eov.is_error()) {
@@ -162,14 +123,11 @@ error_or<void> streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentE
}
auto& len_val = eov.get_value();
out_buffer_.write_advance(len_val);
- std::cout<<"C"<<std::endl;
if (restart_write && out_buffer_.read_segment_length() > 0u) {
- std::cout<<"D"<<std::endl;
io_stream_->write(&out_buffer_.read(),
out_buffer_.read_segment_length());
}
- std::cout<<"E"<<std::endl;
return void_t{};
}
@@ -178,8 +136,16 @@ template <typename Incoming, typename Outgoing, typename TransportEncoding, type
typename BufferT>
conveyor<void>
streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding,
- BufferT>::on_read_disconnected() {
- return io_stream_->on_read_disconnected();
+ BufferT>::on_disconnected() {
+ io_read_disconnected_ = io_stream_->on_read_disconnected().then([this](){
+ if(disconnect_feeder_){
+ disconnect_feeder_->feed();
+ }
+ }).sink();
+
+ auto caf = new_conveyor_and_feeder<void>();
+ disconnect_feeder_ = std::move(caf.feeder);
+ return std::move(caf.conveyor);
}
template <typename Incoming, typename Outgoing, typename TransportEncoding, typename ContentEncoding, typename BufferT>
diff --git a/modules/io_codec/examples/peer_echo_client.cpp b/modules/io_codec/examples/peer_echo_client.cpp
index d923bc7..3ae67ca 100644
--- a/modules/io_codec/examples/peer_echo_client.cpp
+++ b/modules/io_codec/examples/peer_echo_client.cpp
@@ -76,7 +76,9 @@ int main(){
}).detach();
auto peer_str = echo_peer_stream_p.first.get();
- peer_str->on_read_disconnected().attach(std::move(echo_peer_stream_p.first)).then([]() -> error_or<void>{
+ peer_str->on_disconnected().attach(std::move(echo_peer_stream_p.first)).then([&]() -> error_or<void>{
+ keep_running = false;
+ std::cout<<"disconnect"<<std::endl;
return make_error<err::critical>("Destroy pipeline on purpose :>");
}).detach();
diff --git a/modules/io_codec/examples/peer_echo_server.cpp b/modules/io_codec/examples/peer_echo_server.cpp
index 87bf721..d579622 100644
--- a/modules/io_codec/examples/peer_echo_server.cpp
+++ b/modules/io_codec/examples/peer_echo_server.cpp
@@ -86,7 +86,8 @@ int main(){
return make_void();
}).detach();
- peer_str->on_read_disconnected().attach(std::move(echo_peer_stream_p.first)).then([]() -> error_or<void>{
+ peer_str->on_disconnected().attach(std::move(echo_peer_stream_p.first)).then([]() -> error_or<void>{
+ std::cout<<"Disconnected client"<<std::endl;
return make_error<err::critical>("Destroy pipeline on purpose :>");
}).detach();
}).detach();