diff options
author | Claudius "keldu" Holeksa <mail@keldu.de> | 2024-03-15 14:41:47 +0100 |
---|---|---|
committer | Claudius "keldu" Holeksa <mail@keldu.de> | 2024-03-15 14:41:47 +0100 |
commit | c0424e7a55250705579ee64c269892677fa86adf (patch) | |
tree | 59f15a90bc14dffbecd139043420077a446c6d8e /modules | |
parent | b08872bc9a475559491659f5fea8f45a063cf1bf (diff) |
async,io: Fixed immediate issues in async and built a basic io
echo_server
Diffstat (limited to 'modules')
-rw-r--r-- | modules/async/c++/async.tmpl.hpp | 9 | ||||
-rw-r--r-- | modules/async/tests/immediate.cpp | 6 | ||||
-rw-r--r-- | modules/io/examples/echo.hpp | 1 | ||||
-rw-r--r-- | modules/io/examples/echo_client.cpp | 2 | ||||
-rw-r--r-- | modules/io/examples/echo_server.cpp | 71 |
5 files changed, 78 insertions, 11 deletions
diff --git a/modules/async/c++/async.tmpl.hpp b/modules/async/c++/async.tmpl.hpp index 98573b5..ec8d3fc 100644 --- a/modules/async/c++/async.tmpl.hpp +++ b/modules/async/c++/async.tmpl.hpp @@ -285,7 +285,7 @@ template <typename T> size_t immediate_conveyor_node<T>::space() const { } template <typename T> size_t immediate_conveyor_node<T>::queued() const { - return retrieved_ > 1 ? 0 : 1; + return retrieved_ > 0 ? 0 : 1; } template <typename T> void immediate_conveyor_node<T>::child_has_fired() { @@ -301,15 +301,10 @@ template <typename T> void immediate_conveyor_node<T>::parent_has_fired() { arm_next(); } } -} -#include <iostream> -namespace saw { + template <typename T> void immediate_conveyor_node<T>::fire() { - std::cout<<"Immediate fire"<<std::endl; if (parent_) { - std::cout<<"Immediate parent: "<<queued()<<" "<<parent_->space()<<std::endl; parent_->child_has_fired(); - std::cout<<"Immediate parent2: "<<queued()<<" "<<parent_->space()<<std::endl; if (queued() > 0 && parent_->space() > 0) { arm_last(); } diff --git a/modules/async/tests/immediate.cpp b/modules/async/tests/immediate.cpp index a5d9b2f..5a59ccd 100644 --- a/modules/async/tests/immediate.cpp +++ b/modules/async/tests/immediate.cpp @@ -34,10 +34,11 @@ SAW_TEST("Immediate Conveyor Queueing"){ conveyor<int> immediately_done{val}; int passed = 0; + bool has_failed = false; auto res = immediately_done.then([&passed](int a) { passed = a; - }).sink([](auto err){ - SAW_EXPECT(false, "No error should occur. This code path shouldn't be reachable."); + }).sink([&](auto err){ + has_failed = true; return err; }); @@ -48,5 +49,6 @@ SAW_TEST("Immediate Conveyor Queueing"){ wait.poll(); SAW_EXPECT(passed == val, "Expected a 5 in passed value."); + SAW_EXPECT(!has_failed, "No error should occur in queueing"); } } diff --git a/modules/io/examples/echo.hpp b/modules/io/examples/echo.hpp index 2201457..4eb8084 100644 --- a/modules/io/examples/echo.hpp +++ b/modules/io/examples/echo.hpp @@ -3,6 +3,7 @@ struct message { std::array<uint8_t, 256> data; uint64_t already_read = 0; + uint64_t already_written = 0; }; constexpr std::string_view message_content = "Hello! This is an echo msg."; diff --git a/modules/io/examples/echo_client.cpp b/modules/io/examples/echo_client.cpp index 9e56f6b..ad03779 100644 --- a/modules/io/examples/echo_client.cpp +++ b/modules/io/examples/echo_client.cpp @@ -31,5 +31,7 @@ int main(){ wait_scope.wait(); } + std::cout<<"Shutting down echo client"<<std::endl; + return 0; } diff --git a/modules/io/examples/echo_server.cpp b/modules/io/examples/echo_server.cpp index a158368..50863db 100644 --- a/modules/io/examples/echo_server.cpp +++ b/modules/io/examples/echo_server.cpp @@ -4,6 +4,21 @@ #include "echo.hpp" +saw::error_or<void> handle_echo_write(saw::io_stream& rmt_clt, message& state, uint64_t tbw){ + auto eov = rmt_clt.write(&state.data[state.already_written], tbw); + if(eov.is_error()){ + return std::move(eov.get_error()); + } + + auto val = eov.get_value(); + state.already_written += val; + if(state.already_written > state.already_read){ + exit(-1); + } + + return saw::void_t{}; +} + void handle_echo_message(saw::io_stream& rmt_clt, bool& keep_running, message& state){ rmt_clt.read_ready().then([&](){ for(;;){ @@ -11,12 +26,61 @@ void handle_echo_message(saw::io_stream& rmt_clt, bool& keep_running, message& s if(tbr == 0){ exit(-1); } - auto eov = rmt_clt.read(&state.data[0], tbr); + auto eov = rmt_clt.read(&state.data[state.already_read], tbr); if(eov.is_error()){ auto& err = eov.get_error(); if(err.is_critical()){ + std::cerr<<err.get_category()<<std::endl; exit(err.get_id()); + }else{ + break; + } + } + auto read_bytes = eov.get_value(); + if(read_bytes == 0u){ + exit(-1); + } + + bool trigger_write = (state.already_read == state.already_written); + + state.already_read += read_bytes; + if(state.already_read > state.data.size()){ + state.already_read = state.data.size(); + } + if(trigger_write){ + auto eov = handle_echo_write(rmt_clt, state, state.already_read - state.already_written); + if(eov.is_error()){ + auto& err = eov.get_error(); + if(err.is_critical()){ + std::cerr<<err.get_category()<<std::endl; + exit(err.get_id()); + }else { + break; + } + } + } + } + }).detach(); + + rmt_clt.write_ready().then([&](){ + for(;;){ + if(state.already_read < state.already_written){ + exit(-1); + } + uint64_t tbw = state.already_read - state.already_written; + if(tbw == 0){ + break; + } + + auto eov = handle_echo_write(rmt_clt, state, tbw); + if(eov.is_error()){ + auto& err = eov.get_error(); + if(err.is_critical()){ + std::cerr<<err.get_category()<<std::endl; + exit(err.get_id()); + }else { + break; } } } @@ -62,10 +126,13 @@ int main(){ srv = network.listen(*addr); if(srv){ srv->accept().then([&](auto client) -> saw::error_or<void>{ + if(!remote_client){ + std::cout<<"Accepted client"<<std::endl; remote_client = std::move(client); if(remote_client){ + std::cout<<"Spinning up handler"<<std::endl; handle_echo_message(*remote_client, keep_running, msg_state); }else{ keep_running = false; @@ -86,7 +153,7 @@ int main(){ wait_scope.poll(); while(keep_running){ - wait_scope.wait(std::chrono::seconds{60}); + wait_scope.wait(std::chrono::seconds{5}); } std::cout<<"\n\nShutting down echo server"<<std::endl; |