summaryrefslogtreecommitdiff
path: root/modules
diff options
context:
space:
mode:
authorClaudius "keldu" Holeksa <mail@keldu.de>2024-03-15 14:41:47 +0100
committerClaudius "keldu" Holeksa <mail@keldu.de>2024-03-15 14:41:47 +0100
commitc0424e7a55250705579ee64c269892677fa86adf (patch)
tree59f15a90bc14dffbecd139043420077a446c6d8e /modules
parentb08872bc9a475559491659f5fea8f45a063cf1bf (diff)
async,io: Fixed immediate issues in async and built a basic io
echo_server
Diffstat (limited to 'modules')
-rw-r--r--modules/async/c++/async.tmpl.hpp9
-rw-r--r--modules/async/tests/immediate.cpp6
-rw-r--r--modules/io/examples/echo.hpp1
-rw-r--r--modules/io/examples/echo_client.cpp2
-rw-r--r--modules/io/examples/echo_server.cpp71
5 files changed, 78 insertions, 11 deletions
diff --git a/modules/async/c++/async.tmpl.hpp b/modules/async/c++/async.tmpl.hpp
index 98573b5..ec8d3fc 100644
--- a/modules/async/c++/async.tmpl.hpp
+++ b/modules/async/c++/async.tmpl.hpp
@@ -285,7 +285,7 @@ template <typename T> size_t immediate_conveyor_node<T>::space() const {
}
template <typename T> size_t immediate_conveyor_node<T>::queued() const {
- return retrieved_ > 1 ? 0 : 1;
+ return retrieved_ > 0 ? 0 : 1;
}
template <typename T> void immediate_conveyor_node<T>::child_has_fired() {
@@ -301,15 +301,10 @@ template <typename T> void immediate_conveyor_node<T>::parent_has_fired() {
arm_next();
}
}
-}
-#include <iostream>
-namespace saw {
+
template <typename T> void immediate_conveyor_node<T>::fire() {
- std::cout<<"Immediate fire"<<std::endl;
if (parent_) {
- std::cout<<"Immediate parent: "<<queued()<<" "<<parent_->space()<<std::endl;
parent_->child_has_fired();
- std::cout<<"Immediate parent2: "<<queued()<<" "<<parent_->space()<<std::endl;
if (queued() > 0 && parent_->space() > 0) {
arm_last();
}
diff --git a/modules/async/tests/immediate.cpp b/modules/async/tests/immediate.cpp
index a5d9b2f..5a59ccd 100644
--- a/modules/async/tests/immediate.cpp
+++ b/modules/async/tests/immediate.cpp
@@ -34,10 +34,11 @@ SAW_TEST("Immediate Conveyor Queueing"){
conveyor<int> immediately_done{val};
int passed = 0;
+ bool has_failed = false;
auto res = immediately_done.then([&passed](int a) {
passed = a;
- }).sink([](auto err){
- SAW_EXPECT(false, "No error should occur. This code path shouldn't be reachable.");
+ }).sink([&](auto err){
+ has_failed = true;
return err;
});
@@ -48,5 +49,6 @@ SAW_TEST("Immediate Conveyor Queueing"){
wait.poll();
SAW_EXPECT(passed == val, "Expected a 5 in passed value.");
+ SAW_EXPECT(!has_failed, "No error should occur in queueing");
}
}
diff --git a/modules/io/examples/echo.hpp b/modules/io/examples/echo.hpp
index 2201457..4eb8084 100644
--- a/modules/io/examples/echo.hpp
+++ b/modules/io/examples/echo.hpp
@@ -3,6 +3,7 @@
struct message {
std::array<uint8_t, 256> data;
uint64_t already_read = 0;
+ uint64_t already_written = 0;
};
constexpr std::string_view message_content = "Hello! This is an echo msg.";
diff --git a/modules/io/examples/echo_client.cpp b/modules/io/examples/echo_client.cpp
index 9e56f6b..ad03779 100644
--- a/modules/io/examples/echo_client.cpp
+++ b/modules/io/examples/echo_client.cpp
@@ -31,5 +31,7 @@ int main(){
wait_scope.wait();
}
+ std::cout<<"Shutting down echo client"<<std::endl;
+
return 0;
}
diff --git a/modules/io/examples/echo_server.cpp b/modules/io/examples/echo_server.cpp
index a158368..50863db 100644
--- a/modules/io/examples/echo_server.cpp
+++ b/modules/io/examples/echo_server.cpp
@@ -4,6 +4,21 @@
#include "echo.hpp"
+saw::error_or<void> handle_echo_write(saw::io_stream& rmt_clt, message& state, uint64_t tbw){
+ auto eov = rmt_clt.write(&state.data[state.already_written], tbw);
+ if(eov.is_error()){
+ return std::move(eov.get_error());
+ }
+
+ auto val = eov.get_value();
+ state.already_written += val;
+ if(state.already_written > state.already_read){
+ exit(-1);
+ }
+
+ return saw::void_t{};
+}
+
void handle_echo_message(saw::io_stream& rmt_clt, bool& keep_running, message& state){
rmt_clt.read_ready().then([&](){
for(;;){
@@ -11,12 +26,61 @@ void handle_echo_message(saw::io_stream& rmt_clt, bool& keep_running, message& s
if(tbr == 0){
exit(-1);
}
- auto eov = rmt_clt.read(&state.data[0], tbr);
+ auto eov = rmt_clt.read(&state.data[state.already_read], tbr);
if(eov.is_error()){
auto& err = eov.get_error();
if(err.is_critical()){
+ std::cerr<<err.get_category()<<std::endl;
exit(err.get_id());
+ }else{
+ break;
+ }
+ }
+ auto read_bytes = eov.get_value();
+ if(read_bytes == 0u){
+ exit(-1);
+ }
+
+ bool trigger_write = (state.already_read == state.already_written);
+
+ state.already_read += read_bytes;
+ if(state.already_read > state.data.size()){
+ state.already_read = state.data.size();
+ }
+ if(trigger_write){
+ auto eov = handle_echo_write(rmt_clt, state, state.already_read - state.already_written);
+ if(eov.is_error()){
+ auto& err = eov.get_error();
+ if(err.is_critical()){
+ std::cerr<<err.get_category()<<std::endl;
+ exit(err.get_id());
+ }else {
+ break;
+ }
+ }
+ }
+ }
+ }).detach();
+
+ rmt_clt.write_ready().then([&](){
+ for(;;){
+ if(state.already_read < state.already_written){
+ exit(-1);
+ }
+ uint64_t tbw = state.already_read - state.already_written;
+ if(tbw == 0){
+ break;
+ }
+
+ auto eov = handle_echo_write(rmt_clt, state, tbw);
+ if(eov.is_error()){
+ auto& err = eov.get_error();
+ if(err.is_critical()){
+ std::cerr<<err.get_category()<<std::endl;
+ exit(err.get_id());
+ }else {
+ break;
}
}
}
@@ -62,10 +126,13 @@ int main(){
srv = network.listen(*addr);
if(srv){
srv->accept().then([&](auto client) -> saw::error_or<void>{
+
if(!remote_client){
+ std::cout<<"Accepted client"<<std::endl;
remote_client = std::move(client);
if(remote_client){
+ std::cout<<"Spinning up handler"<<std::endl;
handle_echo_message(*remote_client, keep_running, msg_state);
}else{
keep_running = false;
@@ -86,7 +153,7 @@ int main(){
wait_scope.poll();
while(keep_running){
- wait_scope.wait(std::chrono::seconds{60});
+ wait_scope.wait(std::chrono::seconds{5});
}
std::cout<<"\n\nShutting down echo server"<<std::endl;