summaryrefslogtreecommitdiff
path: root/modules/io_codec
diff options
context:
space:
mode:
authorClaudius 'keldu' Holeksa <mail@keldu.de>2024-08-09 18:26:48 +0200
committerClaudius 'keldu' Holeksa <mail@keldu.de>2024-08-09 18:26:48 +0200
commit17431a0c95558ed61f092fa019231df89677ca0f (patch)
treeff4934738e7a4975af552f1da4ee7343c9ec09ed /modules/io_codec
parenta64b8346f39a34ef811b679bbed8131e2098e546 (diff)
wip
Diffstat (limited to 'modules/io_codec')
-rw-r--r--modules/io_codec/c++/io_peer.hpp5
-rw-r--r--modules/io_codec/c++/io_peer.tmpl.hpp66
-rw-r--r--modules/io_codec/examples/peer_echo_client.cpp4
-rw-r--r--modules/io_codec/examples/peer_echo_server.cpp3
4 files changed, 25 insertions, 53 deletions
diff --git a/modules/io_codec/c++/io_peer.hpp b/modules/io_codec/c++/io_peer.hpp
index f8b986f..bd0af69 100644
--- a/modules/io_codec/c++/io_peer.hpp
+++ b/modules/io_codec/c++/io_peer.hpp
@@ -45,7 +45,7 @@ public:
*/
conveyor_feeder<data<Outgoing, ContentEncoding>> &feeder();
- conveyor<void> on_read_disconnected();
+ conveyor<void> on_disconnected();
private:
/// @unimplemented
@@ -88,6 +88,9 @@ private:
conveyor_sink sink_write_;
peer_conveyor_feeder conveyor_feeder_;
+
+ conveyor_sink io_read_disconnected_;
+ own<conveyor_feeder<void>> disconnect_feeder_ = nullptr;
};
/**
diff --git a/modules/io_codec/c++/io_peer.tmpl.hpp b/modules/io_codec/c++/io_peer.tmpl.hpp
index 967a8ab..0322631 100644
--- a/modules/io_codec/c++/io_peer.tmpl.hpp
+++ b/modules/io_codec/c++/io_peer.tmpl.hpp
@@ -29,29 +29,8 @@ streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding,
io_stream_->read_done()
.then([this](size_t bytes) -> error_or<void> {
- std::cout<<"Read done start: "<<bytes<<std::endl;
in_buffer_.write_advance(bytes);
- std::cout<<"buff state: ";
- std::cout<<in_buffer_.read_composite_length();
- std::cout<<" ";
- std::cout<<in_buffer_.read_segment_length();
- std::cout<<" ";
- std::cout<<in_buffer_.write_composite_length();
- std::cout<<" ";
- std::cout<<in_buffer_.write_segment_length();
- std::cout<<std::endl;
-
- std::cout<<"buff state: ";
- std::cout<<out_buffer_.read_composite_length();
- std::cout<<" ";
- std::cout<<out_buffer_.read_segment_length();
- std::cout<<" ";
- std::cout<<out_buffer_.write_composite_length();
- std::cout<<" ";
- std::cout<<out_buffer_.write_segment_length();
- std::cout<<std::endl;
-
if (in_buffer_.write_segment_length() == 0) {
return make_error<err::critical>("Message too long");
}
@@ -82,26 +61,6 @@ streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding,
in_buffer_.read_advance(len_val);
in_buff->write_advance(len_val);
- std::cout<<"read buff state: ";
- std::cout<<in_buffer_.read_composite_length();
- std::cout<<" ";
- std::cout<<in_buffer_.read_segment_length();
- std::cout<<" ";
- std::cout<<in_buffer_.write_composite_length();
- std::cout<<" ";
- std::cout<<in_buffer_.write_segment_length();
- std::cout<<std::endl;
-
- std::cout<<"write buff state: ";
- std::cout<<out_buffer_.read_composite_length();
- std::cout<<" ";
- std::cout<<out_buffer_.read_segment_length();
- std::cout<<" ";
- std::cout<<out_buffer_.write_composite_length();
- std::cout<<" ";
- std::cout<<out_buffer_.write_segment_length();
- std::cout<<std::endl;
-
data<Incoming, ContentEncoding> in_data{std::move(in_buff)};
incoming_feeder_->feed(std::move(in_data));
}
@@ -123,7 +82,12 @@ streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding,
}
return void_t{};
- })
+ },[this](error err){
+ if(disconnect_feeder_){
+ disconnect_feeder_->feed();
+ }
+ return err;
+ })
.sink()
},
conveyor_feeder_{
@@ -139,10 +103,8 @@ template <typename Incoming, typename Outgoing, typename TransportEncoding,
error_or<void> streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding,
BufferT>::send(data<Outgoing, ContentEncoding>
msg) {
- std::cout<<"O: "<<out_buffer_.read_segment_length()<<std::endl;
bool restart_write = (out_buffer_.read_segment_length() == 0u);
- std::cout<<"A: "<<out_buffer_.read_segment_length()<<std::endl;
auto& msg_buff = msg.get_buffer();
{
@@ -153,7 +115,6 @@ error_or<void> streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentE
}
// auto& len_val = eov.get_value();
}
- std::cout<<"B"<<std::endl;
auto eov = out_buffer_.write_from(msg_buff);
if (eov.is_error()) {
@@ -162,14 +123,11 @@ error_or<void> streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentE
}
auto& len_val = eov.get_value();
out_buffer_.write_advance(len_val);
- std::cout<<"C"<<std::endl;
if (restart_write && out_buffer_.read_segment_length() > 0u) {
- std::cout<<"D"<<std::endl;
io_stream_->write(&out_buffer_.read(),
out_buffer_.read_segment_length());
}
- std::cout<<"E"<<std::endl;
return void_t{};
}
@@ -178,8 +136,16 @@ template <typename Incoming, typename Outgoing, typename TransportEncoding, type
typename BufferT>
conveyor<void>
streaming_io_peer<Incoming, Outgoing, TransportEncoding, ContentEncoding,
- BufferT>::on_read_disconnected() {
- return io_stream_->on_read_disconnected();
+ BufferT>::on_disconnected() {
+ io_read_disconnected_ = io_stream_->on_read_disconnected().then([this](){
+ if(disconnect_feeder_){
+ disconnect_feeder_->feed();
+ }
+ }).sink();
+
+ auto caf = new_conveyor_and_feeder<void>();
+ disconnect_feeder_ = std::move(caf.feeder);
+ return std::move(caf.conveyor);
}
template <typename Incoming, typename Outgoing, typename TransportEncoding, typename ContentEncoding, typename BufferT>
diff --git a/modules/io_codec/examples/peer_echo_client.cpp b/modules/io_codec/examples/peer_echo_client.cpp
index d923bc7..3ae67ca 100644
--- a/modules/io_codec/examples/peer_echo_client.cpp
+++ b/modules/io_codec/examples/peer_echo_client.cpp
@@ -76,7 +76,9 @@ int main(){
}).detach();
auto peer_str = echo_peer_stream_p.first.get();
- peer_str->on_read_disconnected().attach(std::move(echo_peer_stream_p.first)).then([]() -> error_or<void>{
+ peer_str->on_disconnected().attach(std::move(echo_peer_stream_p.first)).then([&]() -> error_or<void>{
+ keep_running = false;
+ std::cout<<"disconnect"<<std::endl;
return make_error<err::critical>("Destroy pipeline on purpose :>");
}).detach();
diff --git a/modules/io_codec/examples/peer_echo_server.cpp b/modules/io_codec/examples/peer_echo_server.cpp
index 87bf721..d579622 100644
--- a/modules/io_codec/examples/peer_echo_server.cpp
+++ b/modules/io_codec/examples/peer_echo_server.cpp
@@ -86,7 +86,8 @@ int main(){
return make_void();
}).detach();
- peer_str->on_read_disconnected().attach(std::move(echo_peer_stream_p.first)).then([]() -> error_or<void>{
+ peer_str->on_disconnected().attach(std::move(echo_peer_stream_p.first)).then([]() -> error_or<void>{
+ std::cout<<"Disconnected client"<<std::endl;
return make_error<err::critical>("Destroy pipeline on purpose :>");
}).detach();
}).detach();