summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--default.nix19
-rw-r--r--forstio/SConscript1
-rw-r--r--forstio/async/.nix/derivation.nix31
-rw-r--r--forstio/async/SConscript32
-rw-r--r--forstio/async/SConstruct72
-rw-r--r--forstio/async/async.cpp419
-rw-r--r--forstio/async/async.h1023
-rw-r--r--forstio/async/async.tmpl.h769
8 files changed, 2363 insertions, 3 deletions
diff --git a/default.nix b/default.nix
index 1eada57..9c99edf 100644
--- a/default.nix
+++ b/default.nix
@@ -1,10 +1,23 @@
{ pkgs ? import <nixpkgs> {}
}:
-{
+let
+ core_src = ./forstio/core;
+ async_src = ./forstio/async;
+ version = "0.0.0";
+in rec {
forstio = {
- core = pkgs.callPackage ./forstio/core/.nix/derivation.nix {
- version = "0.0.0";
+ core = pkgs.callPackage "${core_src}/.nix/derivation.nix" {
+ inherit version;
+ clang = pkgs.clang_15;
+ clang-tools = pkgs.clang-tools_15;
+ };
+
+ async = pkgs.callPackage "${async_src}/.nix/derivation.nix" {
+ inherit version;
+ inherit forstio;
+ clang = pkgs.clang_15;
+ clang-tools = pkgs.clang-tools_15;
};
};
}
diff --git a/forstio/SConscript b/forstio/SConscript
index 08ac0f0..8da5a3d 100644
--- a/forstio/SConscript
+++ b/forstio/SConscript
@@ -5,3 +5,4 @@ Import('env')
# Export to other libs
Export('env');
SConscript('core/SConscript');
+SConscript('async/SConscript');
diff --git a/forstio/async/.nix/derivation.nix b/forstio/async/.nix/derivation.nix
new file mode 100644
index 0000000..8ceac08
--- /dev/null
+++ b/forstio/async/.nix/derivation.nix
@@ -0,0 +1,31 @@
+{ lib
+, stdenvNoCC
+, scons
+, clang
+, clang-tools
+, version
+, forstio
+}:
+
+let
+
+in stdenvNoCC.mkDerivation {
+ pname = "forstio-async";
+ inherit version;
+
+ src = ./..;
+
+ enableParallelBuilding = true;
+
+ nativeBuildInputs = [
+ scons
+ clang
+ clang-tools
+ ];
+
+ buildInputs = [
+ forstio.core
+ ];
+
+ outputs = ["out" "dev"];
+}
diff --git a/forstio/async/SConscript b/forstio/async/SConscript
new file mode 100644
index 0000000..0b1e7dd
--- /dev/null
+++ b/forstio/async/SConscript
@@ -0,0 +1,32 @@
+#!/bin/false
+
+import os
+import os.path
+import glob
+
+
+Import('env')
+
+dir_path = Dir('.').abspath
+
+env.sources += sorted(glob.glob(dir_path + "/*.cpp"))
+env.headers += sorted(glob.glob(dir_path + "/*.h"))
+
+# Environment for base library
+base_lib_env = env.Clone();
+
+## Shared lib
+objects_shared = []
+base_lib_env.add_source_files(objects_shared, env.sources, shared=True);
+env.library_shared = base_lib_env.SharedLibrary('#build/forstio-async', [objects_shared]);
+
+## Static lib
+objects_static = []
+base_lib_env.add_source_files(objects_static, env.sources, shared=False);
+env.library_static = base_lib_env.StaticLibrary('#build/forstio-async', [objects_static]);
+
+# Export to other libs
+Export('base_lib_env')
+
+# Set Alias
+env.Alias('library', [env.library_shared, env.library_static]);
diff --git a/forstio/async/SConstruct b/forstio/async/SConstruct
new file mode 100644
index 0000000..2d5050f
--- /dev/null
+++ b/forstio/async/SConstruct
@@ -0,0 +1,72 @@
+#!/usr/bin/env python3
+
+import sys
+import os
+import os.path
+import glob
+import re
+
+
+if sys.version_info < (3,):
+ def isbasestring(s):
+ return isinstance(s,basestring)
+else:
+ def isbasestring(s):
+ return isinstance(s, (str,bytes))
+
+def add_kel_source_files(self, sources, filetype, lib_env=None, shared=False, target_post=""):
+
+ if isbasestring(filetype):
+ dir_path = self.Dir('.').abspath
+ filetype = sorted(glob.glob(dir_path+"/"+filetype))
+
+ for path in filetype:
+ target_name = re.sub( r'(.*?)(\.cpp|\.c\+\+)', r'\1' + target_post, path )
+ if shared:
+ target_name+='.os'
+ sources.append( self.SharedObject( target=target_name, source=path ) )
+ else:
+ target_name+='.o'
+ sources.append( self.StaticObject( target=target_name, source=path ) )
+ pass
+
+def isAbsolutePath(key, dirname, env):
+ assert os.path.isabs(dirname), "%r must have absolute path syntax" % (key,)
+
+env_vars = Variables(
+ args=ARGUMENTS
+);
+
+env_vars.Add('prefix',
+ help='Installation target location of build results and headers',
+ default='/usr/local/',
+ validator=isAbsolutePath
+);
+
+env=Environment(ENV=os.environ, variables=env_vars, CPPPATH=[],
+ CPPDEFINES=['SAW_UNIX'],
+ CXXFLAGS=['-std=c++20','-g','-Wall','-Wextra'],
+ LIBS=[]);
+env.__class__.add_source_files = add_kel_source_files;
+env.Tool('compilation_db');
+env.cdb = env.CompilationDatabase('compile_commands.json');
+
+env.objects = [];
+env.sources = [];
+env.headers = [];
+
+Export('env');
+SConscript('src/forstio/async/SConscript');
+
+# Tests
+# SConscript('tests/SConscript')
+# env.Alias('test', env.test_program)
+
+env.Alias('cdb', env.cdb);
+env.Alias('all', ['library']);# , 'test'])
+env.Default('all');
+
+env.Install('$prefix/lib/', [env.library_shared, env.library_static]);
+env.Install('$prefix/include/forstio/async/', [env.headers]);
+
+env.Alias('install', '$prefix');
diff --git a/forstio/async/async.cpp b/forstio/async/async.cpp
new file mode 100644
index 0000000..7d177f5
--- /dev/null
+++ b/forstio/async/async.cpp
@@ -0,0 +1,419 @@
+#include "async.h"
+#include <forstio/common.h>
+#include <forstio/error.h>
+
+#include <algorithm>
+#include <cassert>
+
+namespace saw {
+namespace {
+thread_local event_loop *local_loop = nullptr;
+
+event_loop &current_event_loop() {
+ event_loop *loop = local_loop;
+ assert(loop);
+ return *loop;
+}
+} // namespace
+
+conveyor_node::conveyor_node() {}
+
+conveyor_node_with_child_mixin::conveyor_node_with_child_mixin(
+ own<conveyor_node> &&child_, conveyor_node &owner)
+ : child{std::move(child_)} {
+ assert(child);
+
+ child->notify_parent_attached(owner);
+}
+
+error_or<own<conveyor_node>>
+conveyor_node_with_child_mixin::swap_child(own<conveyor_node> &&swapee) {
+ SAW_ASSERT(child) {
+ return critical_error("Child should exist if this function is called");
+ }
+ own<conveyor_node> old_child = std::move(child);
+
+ /**
+ * We need the parent of the old_child's next storage
+ */
+ conveyor_storage *old_storage = old_child->next_storage();
+ conveyor_storage *old_storage_parent =
+ old_storage ? old_storage->get_parent() : nullptr;
+
+ /**
+ * Swap in the new child
+ */
+ if (swapee) {
+ child = std::move(swapee);
+
+ /**
+ * Then we need to set the new child's storage parent since the next
+ * storage has a nullptr set And if the old_storage_parent is a nullptr,
+ * then it doesn't matter. So we don't check for it
+ */
+ conveyor_storage *swapee_storage = child->next_storage();
+ if (swapee_storage) {
+ swapee_storage->set_parent(old_storage_parent);
+ }
+ }
+
+ return old_child;
+}
+
+conveyor_storage::conveyor_storage() {}
+
+conveyor_storage::~conveyor_storage() {}
+
+conveyor_storage *conveyor_storage::get_parent() const { return parent_; }
+
+void conveyor_event_storage::set_parent(conveyor_storage *p) {
+ /*
+ * parent check isn't needed, but is used
+ * for the assert, because the storage should
+ * be armed if there was an element present
+ * and a valid parent
+ */
+ if (/*!parent && */ p && !is_armed() && queued() > 0) {
+ assert(!parent_);
+ if (p->space() > 0) {
+ arm_later();
+ }
+ }
+
+ parent_ = p;
+}
+
+conveyor_event_storage::conveyor_event_storage() : conveyor_storage{} {}
+
+conveyor_base::conveyor_base(own<conveyor_node> &&node_p)
+ : node_{std::move(node_p)} {}
+
+error propagate_error::operator()(const error &error) const {
+ return error.copy_error();
+}
+
+error propagate_error::operator()(error &&err) { return std::move(err); }
+
+event::event() : event(current_event_loop()) {}
+
+event::event(event_loop &loop) : loop_{loop} {}
+
+event::~event() { disarm(); }
+
+void event::arm_next() {
+ assert(&loop_ == local_loop);
+ if (prev_ == nullptr) {
+ // Push the next_insert_point back by one
+ // and inserts itself before that
+ next_ = *loop_.next_insert_point_;
+ prev_ = loop_.next_insert_point_;
+ *prev_ = this;
+ if (next_) {
+ next_->prev_ = &next_;
+ }
+
+ // Set the new insertion ptr location to next
+ loop_.next_insert_point_ = &next_;
+
+ // Pushes back the later insert point if it was pointing at the
+ // previous event
+ if (loop_.later_insert_point_ == prev_) {
+ loop_.later_insert_point_ = &next_;
+ }
+
+ // If tail_ points at the same location then
+ // we are at the end and have to update tail_ then.
+ // Technically should be possible by checking if
+ // next is a `nullptr`
+ if (loop_.tail_ == prev_) {
+ loop_.tail_ = &next_;
+ }
+
+ loop_.set_runnable(true);
+ }
+}
+
+void event::arm_later() {
+ assert(&loop_ == local_loop);
+
+ if (prev_ == nullptr) {
+ next_ = *loop_.later_insert_point_;
+ prev_ = loop_.later_insert_point_;
+ *prev_ = this;
+ if (next_) {
+ next_->prev_ = &next_;
+ }
+
+ loop_.later_insert_point_ = &next_;
+ if (loop_.tail_ == prev_) {
+ loop_.tail_ = &next_;
+ }
+
+ loop_.set_runnable(true);
+ }
+}
+
+void event::arm_last() {
+ assert(&loop_ == local_loop);
+
+ if (prev_ == nullptr) {
+ next_ = *loop_.later_insert_point_;
+ prev_ = loop_.later_insert_point_;
+ *prev_ = this;
+ if (next_) {
+ next_->prev_ = &next_;
+ }
+
+ if (loop_.tail_ == prev_) {
+ loop_.tail_ = &next_;
+ }
+
+ loop_.set_runnable(true);
+ }
+}
+
+void event::disarm() {
+ if (prev_ != nullptr) {
+ if (loop_.tail_ == &next_) {
+ loop_.tail_ = prev_;
+ }
+
+ if (loop_.next_insert_point_ == &next_) {
+ loop_.next_insert_point_ = prev_;
+ }
+
+ *prev_ = next_;
+ if (next_) {
+ next_->prev_ = prev_;
+ }
+
+ prev_ = nullptr;
+ next_ = nullptr;
+ }
+}
+
+bool event::is_armed() const { return prev_ != nullptr; }
+
+conveyor_sink::conveyor_sink() : node_{nullptr} {}
+
+conveyor_sink::conveyor_sink(own<conveyor_node> &&node_p)
+ : node_{std::move(node_p)} {}
+
+void event_loop::set_runnable(bool runnable) { is_runnable_ = runnable; }
+
+event_loop::event_loop() {}
+
+event_loop::event_loop(own<class event_port> &&ep)
+ : event_port_{std::move(ep)} {}
+
+event_loop::~event_loop() { assert(local_loop != this); }
+
+void event_loop::enter_scope() {
+ assert(!local_loop);
+ local_loop = this;
+}
+
+void event_loop::leave_scope() {
+ assert(local_loop == this);
+ local_loop = nullptr;
+}
+
+bool event_loop::turn_loop() {
+ size_t turn_step = 0;
+ while (head_ && turn_step < 65536) {
+ if (!turn()) {
+ return false;
+ }
+ ++turn_step;
+ }
+ return true;
+}
+
+bool event_loop::turn() {
+ event *event = head_;
+
+ if (!event) {
+ return false;
+ }
+
+ head_ = event->next_;
+ if (head_) {
+ head_->prev_ = &head_;
+ }
+
+ next_insert_point_ = &head_;
+ if (later_insert_point_ == &event->next_) {
+ later_insert_point_ = &head_;
+ }
+ if (tail_ == &event->next_) {
+ tail_ = &head_;
+ }
+
+ event->next_ = nullptr;
+ event->prev_ = nullptr;
+
+ next_insert_point_ = &head_;
+
+ event->fire();
+
+ return true;
+}
+
+bool event_loop::wait(const std::chrono::steady_clock::duration &duration) {
+ if (event_port_) {
+ event_port_->wait(duration);
+ }
+
+ return turn_loop();
+}
+
+bool event_loop::wait(const std::chrono::steady_clock::time_point &time_point) {
+ if (event_port_) {
+ event_port_->wait(time_point);
+ }
+
+ return turn_loop();
+}
+
+bool event_loop::wait() {
+ if (event_port_) {
+ event_port_->wait();
+ }
+
+ return turn_loop();
+}
+
+bool event_loop::poll() {
+ if (event_port_) {
+ event_port_->poll();
+ }
+
+ return turn_loop();
+}
+
+event_port *event_loop::event_port() { return event_port_.get(); }
+
+conveyor_sink_set &event_loop::daemon() {
+ if (!daemon_sink_) {
+ daemon_sink_ = heap<conveyor_sink_set>();
+ }
+ return *daemon_sink_;
+}
+
+wait_scope::wait_scope(event_loop &loop) : loop_{loop} { loop_.enter_scope(); }
+
+wait_scope::~wait_scope() { loop_.leave_scope(); }
+
+void wait_scope::wait() { loop_.wait(); }
+
+void wait_scope::wait(const std::chrono::steady_clock::duration &duration) {
+ loop_.wait(duration);
+}
+
+void wait_scope::wait(const std::chrono::steady_clock::time_point &time_point) {
+ loop_.wait(time_point);
+}
+
+void wait_scope::poll() { loop_.poll(); }
+
+error_or<own<conveyor_node>>
+convert_conveyor_node_base::swap_child(own<conveyor_node> &&swapee) noexcept {
+ return child_mixin_.swap_child(std::move(swapee));
+}
+
+conveyor_storage *convert_conveyor_node_base::next_storage() noexcept {
+ if (!child_mixin_.child) {
+ return nullptr;
+ }
+ return child_mixin_.child->next_storage();
+}
+
+immediate_conveyor_node_base::immediate_conveyor_node_base()
+ : conveyor_event_storage{} {}
+
+merge_conveyor_node_base::merge_conveyor_node_base()
+ : conveyor_event_storage{} {}
+
+error_or<own<conveyor_node>> queue_buffer_conveyor_node_base::swap_child(
+ own<conveyor_node> &&swapee_) noexcept {
+ return child_mixin_.swap_child(std::move(swapee_));
+}
+
+void conveyor_sink_set::destroy_sink_conveyor_node(conveyor_node &node) {
+ if (!is_armed()) {
+ arm_last();
+ }
+
+ delete_nodes_.push(&node);
+}
+
+void conveyor_sink_set::fail(error &&error) {
+ /// @todo call error_handler
+}
+
+conveyor_sink_set::conveyor_sink_set(event_loop &event_loop)
+ : event{event_loop} {}
+
+void conveyor_sink_set::add(conveyor<void> &&sink) {
+ auto nas = conveyor<void>::from_conveyor(std::move(sink));
+ SAW_ASSERT(nas) { return; }
+ conveyor_storage *storage = nas->next_storage();
+
+ own<sink_conveyor_node> sink_node = nullptr;
+ try {
+ sink_node = heap<sink_conveyor_node>(std::move(nas), *this);
+ } catch (std::bad_alloc &) {
+ return;
+ }
+ if (storage) {
+ storage->set_parent(sink_node.get());
+ }
+
+ sink_nodes_.emplace_back(std::move(sink_node));
+}
+
+void conveyor_sink_set::fire() {
+ while (!delete_nodes_.empty()) {
+ conveyor_node *node = delete_nodes_.front();
+ /*auto erased = */ std::remove_if(sink_nodes_.begin(),
+ sink_nodes_.end(),
+ [node](own<conveyor_node> &element) {
+ return node == element.get();
+ });
+ delete_nodes_.pop();
+ }
+}
+
+convert_conveyor_node_base::convert_conveyor_node_base(own<conveyor_node> &&dep)
+ : child_mixin_{std::move(dep), *this} {}
+
+void convert_conveyor_node_base::get_result(error_or_value &err_or_val) {
+ get_impl(err_or_val);
+}
+
+void attach_conveyor_node_base::get_result(
+ error_or_value &err_or_val) noexcept {
+ if (child_mixin_.child) {
+ child_mixin_.child->get_result(err_or_val);
+ }
+}
+
+error_or<own<conveyor_node>>
+attach_conveyor_node_base::swap_child(own<conveyor_node> &&swapee_) noexcept {
+ return child_mixin_.swap_child(std::move(swapee_));
+}
+
+conveyor_storage *attach_conveyor_node_base::next_storage() noexcept {
+ if (!child_mixin_.child) {
+ return nullptr;
+ }
+
+ return child_mixin_.child->next_storage();
+}
+
+void detach_conveyor(conveyor<void> &&conveyor) {
+ event_loop &loop = current_event_loop();
+ conveyor_sink_set &sink = loop.daemon();
+ sink.add(std::move(conveyor));
+}
+} // namespace saw
diff --git a/forstio/async/async.h b/forstio/async/async.h
new file mode 100644
index 0000000..4e4f230
--- /dev/null
+++ b/forstio/async/async.h
@@ -0,0 +1,1023 @@
+#pragma once
+
+#include <forstio/common.h>
+#include <forstio/error.h>
+
+#include <chrono>
+#include <functional>
+#include <limits>
+#include <list>
+#include <queue>
+#include <type_traits>
+
+namespace saw {
+class conveyor_storage;
+class conveyor_node {
+public:
+ conveyor_node();
+ virtual ~conveyor_node() = default;
+
+ /**
+ * Internal method to retrieve results from children
+ */
+ virtual void get_result(error_or_value &err_or_val) = 0;
+
+ /**
+ * Swap out child with another one
+ */
+ virtual error_or<own<conveyor_node>>
+ swap_child(own<conveyor_node> &&swapee_) = 0;
+
+ /**
+ * Retrieve the next storage node
+ */
+ virtual conveyor_storage *next_storage() = 0;
+
+ /**
+ * Notify that a new parent was attached
+ * Only relevant for the feeding nodes
+ */
+ virtual void notify_parent_attached(conveyor_node &){};
+};
+
+class conveyor_node_with_child_mixin final {
+public:
+ own<conveyor_node> child = nullptr;
+
+ conveyor_node_with_child_mixin(own<conveyor_node> &&child_,
+ conveyor_node &owner_);
+ ~conveyor_node_with_child_mixin() = default;
+
+ /**
+ * Swap out children and return the child ptr, since the caller is the child
+ * itself. Stack needs to be cleared before the child is destroyed, so the
+ * swapped out node is returned as well.
+ */
+ error_or<own<conveyor_node>> swap_child(own<conveyor_node> &&swapee);
+};
+
+class conveyor_node_with_parent_mixin final {
+public:
+ conveyor_node *parent = nullptr;
+
+ error_or<own<conveyor_node>>
+ swap_child_of_parent(own<conveyor_node> &&swapee) {
+ SAW_ASSERT(parent) {
+ return critical_error(
+ "Can't swap child, because parent doesn't exist");
+ }
+
+ return parent->swap_child(std::move(swapee));
+ }
+ void change_parent(conveyor_node *p) { parent = p; }
+};
+
+class event_loop;
+class wait_scope;
+/*
+ * Event class similar to capn'proto.
+ * https://github.com/capnproto/capnproto
+ */
+class event {
+private:
+ event_loop &loop_;
+ event **prev_ = nullptr;
+ event *next_ = nullptr;
+
+ friend class event_loop;
+
+public:
+ event();
+ event(event_loop &loop);
+ virtual ~event();
+
+ virtual void fire() = 0;
+
+ void arm_next();
+ void arm_later();
+ void arm_last();
+ void disarm();
+
+ bool is_armed() const;
+};
+
+class conveyor_storage {
+protected:
+ conveyor_storage *parent_ = nullptr;
+
+public:
+ conveyor_storage();
+ virtual ~conveyor_storage();
+
+ virtual size_t space() const = 0;
+ virtual size_t queued() const = 0;
+ virtual void child_has_fired() = 0;
+ virtual void parent_has_fired() = 0;
+
+ virtual void set_parent(conveyor_storage *parent) = 0;
+ conveyor_storage *get_parent() const;
+};
+
+class conveyor_event_storage : public conveyor_storage, public event {
+public:
+ conveyor_event_storage();
+ virtual ~conveyor_event_storage() = default;
+
+ void set_parent(conveyor_storage *parent) override;
+};
+
+class conveyor_base {
+protected:
+ own<conveyor_node> node_;
+
+public:
+ conveyor_base(own<conveyor_node> &&node_p);
+ virtual ~conveyor_base() = default;
+
+ conveyor_base(conveyor_base &&) = default;
+ conveyor_base &operator=(conveyor_base &&) = default;
+
+ void get(error_or_value &err_or_val);
+};
+
+template <typename T> class conveyor;
+
+template <typename T> conveyor<T> chained_conveyor_type(T *);
+
+// template <typename T> Conveyor<T> chainedConveyorType(Conveyor<T> *);
+
+template <typename T> T remove_error_or_type(T *);
+
+template <typename T> T remove_error_or_type(error_or<T> *);
+
+template <typename T>
+using remove_error_or = decltype(remove_error_or_type((T *)nullptr));
+
+template <typename T>
+using chained_conveyors = decltype(chained_conveyor_type((T *)nullptr));
+
+template <typename Func, typename T>
+using conveyor_result =
+ chained_conveyors<remove_error_or<return_type<Func, T>>>;
+
+struct propagate_error {
+public:
+ error operator()(const error &err) const;
+ error operator()(error &&err);
+};
+
+class conveyor_sink {
+private:
+ own<conveyor_node> node_;
+
+public:
+ conveyor_sink();
+ conveyor_sink(own<conveyor_node> &&node);
+
+ conveyor_sink(conveyor_sink &&) = default;
+ conveyor_sink &operator=(conveyor_sink &&) = default;
+};
+
+template <typename T> class merge_conveyor_node_data;
+
+template <typename T> class merge_conveyor {
+private:
+ lent<merge_conveyor_node_data<T>> data_;
+
+public:
+ merge_conveyor() = default;
+ merge_conveyor(lent<merge_conveyor_node_data<T>> d);
+ ~merge_conveyor();
+
+ void attach(conveyor<T> conv);
+};
+
+/**
+ * Main interface for async operations.
+ */
+template <typename T> class conveyor final : public conveyor_base {
+public:
+ /**
+ * Construct an immediately fulfilled node
+ */
+ conveyor(fix_void<T> value);
+
+ /**
+ * Construct an immediately failed node
+ */
+ conveyor(error &&err);
+
+ /**
+ * Construct a conveyor with a child node
+ */
+ conveyor(own<conveyor_node> node_p);
+
+ conveyor(conveyor<T> &&) = default;
+ conveyor<T> &operator=(conveyor<T> &&) = default;
+
+ /**
+ * This method converts values or errors from children
+ */
+ template <typename Func, typename ErrorFunc = propagate_error>
+ [[nodiscard]] conveyor_result<Func, T>
+ then(Func &&func, ErrorFunc &&error_func = propagate_error());
+
+ /**
+ * This method adds a buffer node in the conveyor chains which acts as a
+ * scheduler interrupt point and collects elements up to the supplied limit.
+ */
+ [[nodiscard]] conveyor<T>
+ buffer(size_t limit = std::numeric_limits<size_t>::max());
+
+ /**
+ * This method just takes ownership of any supplied types,
+ * which are destroyed when the chain gets destroyed.
+ * Useful for resource lifetime control.
+ */
+ template <typename... Args>
+ [[nodiscard]] conveyor<T> attach(Args &&...args);
+
+ /** @todo implement
+ * This method limits the total amount of passed elements
+ * Be careful where you place this node into the chain.
+ * If you meant to fork it and destroy paths you shouldn't place
+ * an interrupt point between the fork and this limiter
+ */
+ [[nodiscard]] conveyor<T> limit(size_t val = 1);
+
+ /**
+ *
+ */
+ [[nodiscard]] std::pair<conveyor<T>, merge_conveyor<T>> merge();
+
+ /**
+ * Moves the conveyor chain into a thread local storage point which drops
+ * every element. Use sink() if you want to control the lifetime of a
+ * conveyor chain
+ */
+ template <typename ErrorFunc = propagate_error>
+ void detach(ErrorFunc &&err_func = propagate_error());
+ /**
+ * Creates a local sink which drops elements, but lifetime control remains
+ * in your hand.
+ */
+ template <typename ErrorFunc = propagate_error>
+ [[nodiscard]] conveyor_sink
+ sink(ErrorFunc &&error_func = propagate_error());
+
+ /**
+ * If no sink() or detach() is used you have to take elements out of the
+ * chain yourself.
+ */
+ error_or<fix_void<T>> take();
+
+ /** @todo implement
+ * Specifically pump elements through this chain with the provided
+ * wait_scope
+ */
+ void poll(wait_scope &wait_scope);
+
+ // helper
+ static conveyor<T> to_conveyor(own<conveyor_node> node);
+
+ // helper
+ static own<conveyor_node> from_conveyor(conveyor<T> conveyor);
+};
+
+template <typename Func> conveyor_result<Func, void> exec_later(Func &&func);
+
+/*
+ * Join Conveyors into a single one
+ */
+template <typename... Args>
+conveyor<std::tuple<Args...>>
+join_conveyors(std::tuple<conveyor<Args>...> &conveyors);
+
+template <typename T> class conveyor_feeder {
+public:
+ virtual ~conveyor_feeder() = default;
+
+ virtual void feed(T &&data) = 0;
+ virtual void fail(error &&error) = 0;
+
+ virtual size_t space() const = 0;
+ virtual size_t queued() const = 0;
+
+ virtual error swap(conveyor<T> &&conveyor) noexcept = 0;
+};
+
+template <> class conveyor_feeder<void> {
+public:
+ virtual ~conveyor_feeder() = default;
+
+ virtual void feed(void_t &&value = void_t{}) = 0;
+ virtual void fail(error &&error) = 0;
+
+ virtual size_t space() const = 0;
+ virtual size_t queued() const = 0;
+
+ virtual error swap(conveyor<void_t> &&conveyor) noexcept = 0;
+};
+
+template <typename T> struct conveyor_and_feeder {
+ own<conveyor_feeder<T>> feeder;
+ conveyor<T> conveyor;
+};
+
+template <typename T> conveyor_and_feeder<T> new_conveyor_and_feeder();
+
+template <typename T> conveyor_and_feeder<T> one_time_conveyor_and_feeder();
+
+enum class Signal : uint8_t { Terminate, User1 };
+
+/**
+ * Class which acts as a correspondent between the running framework and outside
+ * events which may be signals from the operating system or just other threads.
+ * Default EventPorts are supplied by setupAsyncIo() in io.h
+ */
+class event_port {
+public:
+ virtual ~event_port() = default;
+
+ virtual conveyor<void> on_signal(Signal signal) = 0;
+
+ virtual void poll() = 0;
+ virtual void wait() = 0;
+ virtual void wait(const std::chrono::steady_clock::duration &) = 0;
+ virtual void wait(const std::chrono::steady_clock::time_point &) = 0;
+
+ virtual void wake() = 0;
+};
+
+class sink_conveyor_node;
+
+class conveyor_sink_set final : public event {
+private:
+ /*
+ class Helper final : public Event {
+ private:
+ void destroySinkConveyorNode(ConveyorNode& sink);
+ void fail(Error&& error);
+
+ std::vector<Own<ConveyorNode>> sink_nodes;
+ std::queue<ConveyorNode*> delete_nodes;
+ std::function<void(Error&& error)> error_handler;
+
+ public:
+ ConveyorSinks() = default;
+ ConveyorSinks(EventLoop& event_loop);
+
+ void add(Conveyor<void> node);
+
+ void fire() override {}
+ };
+
+ gin::Own<Helper> helper;
+ */
+ friend class sink_conveyor_node;
+
+ void destroy_sink_conveyor_node(conveyor_node &sink_node);
+ void fail(error &&err);
+
+ std::list<own<conveyor_node>> sink_nodes_;
+
+ std::queue<conveyor_node *> delete_nodes_;
+
+ std::function<void(error &&)> error_handler_;
+
+public:
+ // ConveyorSinks();
+ // ConveyorSinks(EventLoop& event_loop);
+ conveyor_sink_set() = default;
+ conveyor_sink_set(event_loop &event_loop);
+
+ void add(conveyor<void> &&node);
+
+ void fire() override;
+};
+
+/*
+ * EventLoop class similar to capn'proto.
+ * https://github.com/capnproto/capnproto
+ */
+class event_loop {
+private:
+ friend class event;
+ event *head_ = nullptr;
+ event **tail_ = &head_;
+ event **next_insert_point_ = &head_;
+ event **later_insert_point_ = &head_;
+
+ bool is_runnable_ = false;
+
+ own<event_port> event_port_ = nullptr;
+
+ own<conveyor_sink_set> daemon_sink_ = nullptr;
+
+ // functions
+ void set_runnable(bool runnable);
+
+ friend class wait_scope;
+ void enter_scope();
+ void leave_scope();
+
+ bool turn_loop();
+ bool turn();
+
+public:
+ event_loop();
+ event_loop(own<event_port> &&port);
+ ~event_loop();
+
+ event_loop(event_loop &&) = default;
+ event_loop &operator=(event_loop &&) = default;
+
+ bool wait();
+ bool wait(const std::chrono::steady_clock::duration &);
+ bool wait(const std::chrono::steady_clock::time_point &);
+ bool poll();
+
+ event_port *event_port();
+
+ conveyor_sink_set &daemon();
+};
+
+/*
+ * WaitScope class similar to capn'proto.
+ * https://github.com/capnproto/capnproto
+ */
+class wait_scope {
+private:
+ event_loop &loop_;
+
+public:
+ wait_scope(event_loop &loop);
+ ~wait_scope();
+
+ void wait();
+ void wait(const std::chrono::steady_clock::duration &);
+ void wait(const std::chrono::steady_clock::time_point &);
+ void poll();
+};
+
+template <typename Func> conveyor_result<Func, void> yield_next(Func &&func);
+
+template <typename Func> conveyor_result<Func, void> yield_later(Func &&func);
+
+template <typename Func> conveyor_result<Func, void> yield_last(Func &&func);
+} // namespace saw
+
+// Secret stuff
+// Aka private semi hidden classes
+namespace saw {
+
+template <typename Out, typename In> struct fix_void_caller {
+ template <typename Func> static Out apply(Func &func, In &&in) {
+ return func(std::move(in));
+ }
+};
+
+template <typename Out> struct fix_void_caller<Out, void_t> {
+ template <typename Func> static Out apply(Func &func, void_t &&in) {
+ (void)in;
+ return func();
+ }
+};
+
+template <typename In> struct fix_void_caller<void_t, In> {
+ template <typename Func> static void_t apply(Func &func, In &&in) {
+ func(std::move(in));
+ return void_t{};
+ }
+};
+
+template <> struct fix_void_caller<void_t, void_t> {
+ template <typename Func> static void_t apply(Func &func, void_t &&in) {
+ (void)in;
+ func();
+ return void_t{};
+ }
+};
+
+template <typename T> class adapt_conveyor_node;
+
+template <typename T>
+class adapt_conveyor_feeder final : public conveyor_feeder<unfix_void<T>> {
+private:
+ adapt_conveyor_node<T> *feedee_ = nullptr;
+
+public:
+ ~adapt_conveyor_feeder();
+
+ void set_feedee(adapt_conveyor_node<T> *feedee);
+
+ void feed(T &&value) override;
+ void fail(error &&error) override;
+
+ size_t space() const override;
+ size_t queued() const override;
+
+ error swap(conveyor<T> &&conv) noexcept override;
+};
+
+template <typename T>
+class adapt_conveyor_node final : public conveyor_node,
+ public conveyor_event_storage {
+private:
+ adapt_conveyor_feeder<T> *feeder_ = nullptr;
+
+ std::queue<error_or<unfix_void<T>>> storage_;
+
+ conveyor_node_with_parent_mixin parent_node_;
+
+public:
+ adapt_conveyor_node();
+ ~adapt_conveyor_node();
+
+ void set_feeder(adapt_conveyor_feeder<T> *feeder);
+
+ void feed(T &&value);
+ void fail(error &&error);
+
+ // ConveyorNode
+ void get_result(error_or_value &err_or_val) override;
+
+ error_or<own<conveyor_node>>
+ swap_child(own<conveyor_node> &&swapee) noexcept override;
+
+ conveyor_storage *next_storage() noexcept override;
+ void notify_parent_attached(conveyor_node &) noexcept override;
+
+ // ConveyorStorage
+ size_t space() const override;
+ size_t queued() const override;
+
+ void child_has_fired() override;
+ void parent_has_fired() override;
+
+ // Event
+ void fire() override;
+};
+
+template <typename T> class one_time_conveyor_node;
+
+template <typename T>
+class one_time_conveyor_feeder final : public conveyor_feeder<unfix_void<T>> {
+private:
+ one_time_conveyor_node<T> *feedee_ = nullptr;
+
+public:
+ ~one_time_conveyor_feeder();
+
+ void set_feedee(one_time_conveyor_node<T> *feedee);
+
+ void feed(T &&value) override;
+ void fail(error &&error) override;
+
+ size_t space() const override;
+ size_t queued() const override;
+};
+
+template <typename T>
+class one_time_conveyor_node final : public conveyor_node,
+ public conveyor_storage,
+ public event {
+private:
+ one_time_conveyor_feeder<T> *feeder_ = nullptr;
+
+ bool passed_ = false;
+ maybe<error_or<T>> storage_ = std::nullopt;
+
+public:
+ ~one_time_conveyor_node();
+
+ void set_feeder(one_time_conveyor_feeder<T> *feeder);
+
+ void feed(T &&value);
+ void fail(error &&error);
+
+ // ConveyorNode
+ void get_result(error_or_value &err_or_val) override;
+
+ error_or<own<conveyor_node>>
+ swap_child(own<conveyor_node> &&swapee) override;
+
+ // ConveyorStorage
+ size_t space() const override;
+ size_t queued() const override;
+
+ void child_has_fired() override {}
+ void parent_has_fired() override;
+
+ // Event
+ void fire() override;
+};
+
+/**
+ * This class buffers and saves incoming elements and acts as an interrupt node
+ * for processing calls
+ */
+class queue_buffer_conveyor_node_base : public conveyor_node,
+ public conveyor_event_storage {
+protected:
+ conveyor_node_with_child_mixin child_mixin_;
+
+public:
+ queue_buffer_conveyor_node_base(own<conveyor_node> child_)
+ : conveyor_event_storage{}, child_mixin_{std::move(child_), *this} {}
+ virtual ~queue_buffer_conveyor_node_base() = default;
+
+ /**
+ * Use mixin
+ */
+ error_or<own<conveyor_node>>
+ swap_child(own<conveyor_node> &&swapee_) noexcept override;
+
+ conveyor_storage *next_storage() noexcept override {
+ return static_cast<conveyor_storage *>(this);
+ }
+};
+
+template <typename T>
+class queue_buffer_conveyor_node final
+ : public queue_buffer_conveyor_node_base {
+private:
+ std::queue<error_or<T>> storage_;
+ size_t max_store_;
+
+public:
+ queue_buffer_conveyor_node(own<conveyor_node> dep, size_t max_size)
+ : queue_buffer_conveyor_node_base{std::move(dep)}, max_store_{
+ max_size} {}
+ // Event
+ void fire() override;
+ // ConveyorNode
+ void get_result(error_or_value &eov) noexcept override;
+
+ // ConveyorStorage
+ size_t space() const override;
+ size_t queued() const override;
+
+ void child_has_fired() override;
+ void parent_has_fired() override;
+};
+
+class attach_conveyor_node_base : public conveyor_node {
+protected:
+ conveyor_node_with_child_mixin child_mixin_;
+
+public:
+ attach_conveyor_node_base(own<conveyor_node> &&child_)
+ : child_mixin_{std::move(child_), *this} {}
+
+ virtual ~attach_conveyor_node_base() = default;
+
+ void get_result(error_or_value &err_or_val) noexcept override;
+
+ /**
+ * Use mixin
+ */
+ error_or<own<conveyor_node>>
+ swap_child(own<conveyor_node> &&swapee_) noexcept override;
+
+ conveyor_storage *next_storage() noexcept override;
+};
+
+template <typename... Args>
+class attach_conveyor_node final : public attach_conveyor_node_base {
+public:
+ attach_conveyor_node(own<conveyor_node> &&dep, Args &&...args)
+ : attach_conveyor_node_base(std::move(dep)), attached_data_{
+ std::move(args...)} {}
+
+private:
+ std::tuple<Args...> attached_data_;
+};
+
+class convert_conveyor_node_base : public conveyor_node {
+public:
+ convert_conveyor_node_base(own<conveyor_node> &&dep);
+ virtual ~convert_conveyor_node_base() = default;
+
+ void get_result(error_or_value &err_or_val) override;
+
+ virtual void get_impl(error_or_value &err_or_val) = 0;
+
+ error_or<own<conveyor_node>>
+ swap_child(own<conveyor_node> &&swapee) noexcept override;
+
+ conveyor_storage *next_storage() noexcept override;
+
+protected:
+ conveyor_node_with_child_mixin child_mixin_;
+};
+
+template <typename T, typename DepT, typename Func, typename ErrorFunc>
+class convert_conveyor_node final : public convert_conveyor_node_base {
+private:
+ Func func_;
+ ErrorFunc error_func_;
+
+ static_assert(std::is_same<DepT, remove_error_or<DepT>>::value,
+ "Should never be of type ErrorOr");
+
+public:
+ convert_conveyor_node(own<conveyor_node> &&dep, Func &&func,
+ ErrorFunc &&error_func)
+ : convert_conveyor_node_base(std::move(dep)), func_{std::move(func)},
+ error_func_{std::move(error_func)} {}
+
+ void get_impl(error_or_value &err_or_val) noexcept override {
+ error_or<unfix_void<DepT>> dep_eov;
+ error_or<unfix_void<remove_error_or<T>>> &eov =
+ err_or_val.as<unfix_void<remove_error_or<T>>>();
+ if (child_mixin_.child) {
+ child_mixin_.child->get_result(dep_eov);
+ if (dep_eov.is_value()) {
+ try {
+
+ eov = fix_void_caller<T, DepT>::apply(
+ func_, std::move(dep_eov.value()));
+ } catch (const std::bad_alloc &) {
+ eov = critical_error("Out of memory");
+ } catch (const std::exception &) {
+ eov = critical_error(
+ "Exception in chain occured. Return ErrorOr<T> if you "
+ "want to handle errors which are recoverable");
+ }
+ } else if (dep_eov.is_error()) {
+ eov = error_func_(std::move(dep_eov.error()));
+ } else {
+ eov = critical_error("No value set in dependency");
+ }
+ } else {
+ eov = critical_error("Conveyor doesn't have child");
+ }
+ }
+};
+
+class sink_conveyor_node final : public conveyor_node,
+ public conveyor_event_storage {
+private:
+ conveyor_node_with_child_mixin child_mixin_;
+ conveyor_sink_set *conveyor_sink_;
+
+public:
+ sink_conveyor_node(own<conveyor_node> node, conveyor_sink_set &conv_sink)
+ : conveyor_event_storage{}, child_mixin_{std::move(node), *this},
+ conveyor_sink_{&conv_sink} {}
+
+ sink_conveyor_node(own<conveyor_node> node)
+ : conveyor_event_storage{}, child_mixin_{std::move(node), *this},
+ conveyor_sink_{nullptr} {}
+
+ // Event only queued if a critical error occured
+ void fire() override {
+ // Queued for destruction of children, because this acts as a sink and
+ // no other event should be here
+ child_mixin_.child = nullptr;
+
+ if (conveyor_sink_) {
+ conveyor_sink_->destroy_sink_conveyor_node(*this);
+ conveyor_sink_ = nullptr;
+ }
+ }
+
+ // ConveyorStorage
+ size_t space() const override { return 1; }
+ size_t queued() const override { return 0; }
+
+ // ConveyorNode
+ void get_result(error_or_value &err_or_val) noexcept override {
+ err_or_val.as<void_t>() =
+ critical_error("In a sink node no result can be returned");
+ }
+
+ error_or<own<conveyor_node>>
+ swap_child(own<conveyor_node> &&swapee) noexcept override {
+ return child_mixin_.swap_child(std::move(swapee));
+ }
+
+ // ConveyorStorage
+ void child_has_fired() override {
+ if (child_mixin_.child) {
+ error_or<void> dep_eov;
+ child_mixin_.child->get_result(dep_eov);
+ if (dep_eov.is_error()) {
+ if (dep_eov.error().is_critical()) {
+ if (!is_armed()) {
+ arm_last();
+ }
+ }
+ if (conveyor_sink_) {
+ conveyor_sink_->fail(std::move(dep_eov.error()));
+ }
+ }
+ }
+ }
+
+ /*
+ * No parent needs to be fired since we always have space
+ */
+ void parent_has_fired() override {}
+
+ conveyor_storage *next_storage() override {
+ // Should never happen though
+ assert(false);
+ return nullptr;
+ // return static_cast<ConveyorStorage*>(this);
+ }
+};
+
+class immediate_conveyor_node_base : public conveyor_node,
+ public conveyor_event_storage {
+private:
+public:
+ immediate_conveyor_node_base();
+
+ virtual ~immediate_conveyor_node_base() = default;
+
+ error_or<own<conveyor_node>>
+ swap_child(own<conveyor_node> &&swapee) noexcept override {
+ (void)swapee;
+ return recoverable_error("Node doesn't support swapping");
+ }
+
+ conveyor_storage *next_storage() noexcept override {
+ return static_cast<conveyor_storage *>(this);
+ }
+};
+
+template <typename T>
+class immediate_conveyor_node final : public immediate_conveyor_node_base {
+private:
+ error_or<fix_void<T>> value_;
+ uint8_t retrieved_;
+
+public:
+ immediate_conveyor_node(fix_void<T> &&val);
+ immediate_conveyor_node(error &&error);
+
+ // ConveyorStorage
+ size_t space() const override;
+ size_t queued() const override;
+
+ void child_has_fired() override;
+ void parent_has_fired() override;
+
+ // ConveyorNode
+ void get_result(error_or_value &err_or_val) noexcept override {
+ if (retrieved_ > 0) {
+ err_or_val.as<fix_void<T>>() =
+ make_error("Already taken value", error::code::Exhausted);
+ } else {
+ err_or_val.as<fix_void<T>>() = std::move(value_);
+ }
+ if (queued() > 0) {
+ ++retrieved_;
+ }
+ }
+
+ // Event
+ void fire() override;
+};
+
+/*
+ * Collects every incoming value and throws it in one lane
+ */
+class merge_conveyor_node_base : public conveyor_node,
+ public conveyor_event_storage {
+public:
+ merge_conveyor_node_base();
+
+ virtual ~merge_conveyor_node_base() = default;
+
+ conveyor_storage *next_storage() noexcept override {
+ return static_cast<conveyor_storage *>(this);
+ }
+};
+
+template <typename T>
+class merge_conveyor_node : public merge_conveyor_node_base {
+private:
+ class appendage final : public conveyor_node, public conveyor_storage {
+ public:
+ own<conveyor_node> child;
+ merge_conveyor_node *merger;
+
+ maybe<error_or<fix_void<T>>> error_or_value_;
+
+ public:
+ appendage(own<conveyor_node> n, merge_conveyor_node &m)
+ : conveyor_storage{}, child{std::move(n)}, merger{&m},
+ error_or_value_{std::nullopt} {}
+
+ bool child_storage_has_element_queued() const {
+ if (!child) {
+ return false;
+ }
+ conveyor_storage *storage = child->next_storage();
+ if (storage) {
+ return storage->queued() > 0;
+ }
+ return false;
+ }
+
+ void get_appendage_result(error_or_value &eov);
+
+ /**
+ * ConveyorNode
+ */
+ error_or<own<conveyor_node>>
+ swap_child(own<conveyor_node> &&swapee_) override;
+
+ conveyor_storage *next_storage() noexcept override {
+ return static_cast<conveyor_storage *>(this);
+ }
+
+ void get_result(error_or_value &err_or_val) override;
+
+ /**
+ * ConveyorStorage
+ */
+ size_t space() const override;
+
+ size_t queued() const override;
+
+ void child_has_fired() override;
+
+ void parent_has_fired() override;
+
+ void set_parent(conveyor_storage *par) override;
+ };
+
+ friend class merge_conveyor_node_data<T>;
+ friend class appendage;
+
+ our<merge_conveyor_node_data<T>> data_;
+ size_t next_appendage_ = 0;
+
+public:
+ merge_conveyor_node(our<merge_conveyor_node_data<T>> data);
+ ~merge_conveyor_node();
+ // ConveyorNode
+ error_or<own<conveyor_node>>
+ swap_child(own<conveyor_node> &&c) noexcept override;
+
+ // Event
+ void get_result(error_or_value &err_or_val) noexcept override;
+
+ void fire() override;
+
+ // ConveyorStorage
+ size_t space() const override;
+ size_t queued() const override;
+ void child_has_fired() override;
+ void parent_has_fired() override;
+};
+
+template <typename T> class merge_conveyor_node_data {
+public:
+ std::vector<own<typename merge_conveyor_node<T>::appendage>> appendages;
+
+ merge_conveyor_node<T> *merger = nullptr;
+
+public:
+ void attach(conveyor<T> conv);
+
+ void governing_node_destroyed();
+};
+
+/*
+class JoinConveyorNodeBase : public ConveyorNode, public ConveyorEventStorage {
+private:
+
+public:
+};
+
+template <typename... Args>
+class JoinConveyorNode final : public JoinConveyorNodeBase {
+private:
+ template<typename T>
+ class Appendage : public ConveyorEventStorage {
+ private:
+ Maybe<T> data = std::nullopt;
+
+ public:
+ size_t space() const override;
+ size_t queued() const override;
+
+ void fire() override;
+ void get_result(ErrorOrValue& eov) override;
+ };
+
+ std::tuple<Appendage<Args>...> appendages;
+
+public:
+};
+
+*/
+
+} // namespace saw
+
+#include "async.tmpl.h"
diff --git a/forstio/async/async.tmpl.h b/forstio/async/async.tmpl.h
new file mode 100644
index 0000000..e4dc54a
--- /dev/null
+++ b/forstio/async/async.tmpl.h
@@ -0,0 +1,769 @@
+#pragma once
+
+#include <forstio/common.h>
+#include <forstio/error.h>
+
+#include <cassert>
+// Template inlining
+
+#include <iostream>
+
+namespace saw {
+
+template <typename Func> conveyor_result<Func, void> execLater(Func &&func) {
+ conveyor<void> conveyor{fix_void<void>{}};
+ return conveyor.then(std::move(func));
+}
+
+template <typename T>
+conveyor<T>::conveyor(fix_void<T> value) : conveyor_base(nullptr) {
+ // Is there any way to do this?
+ // @todo new conveyor_base constructor for Immediate values
+
+ own<immediate_conveyor_node<fix_void<T>>> immediate =
+ heap<immediate_conveyor_node<fix_void<T>>>(std::move(value));
+
+ if (!immediate) {
+ return;
+ }
+
+ node_ = std::move(immediate);
+}
+
+template <typename T>
+conveyor<T>::conveyor(error &&err) : conveyor_base(nullptr) {
+ own<immediate_conveyor_node<fix_void<T>>> immediate =
+ heap<immediate_conveyor_node<fix_void<T>>>(std::move(err));
+
+ if (!immediate) {
+ return;
+ }
+
+ node_ = std::move(immediate);
+}
+
+template <typename T>
+conveyor<T>::conveyor(own<conveyor_node> node_p)
+ : conveyor_base{std::move(node_p)} {}
+
+template <typename T>
+template <typename Func, typename ErrorFunc>
+conveyor_result<Func, T> conveyor<T>::then(Func &&func,
+ ErrorFunc &&error_func) {
+ own<conveyor_node> conversion_node =
+ heap<convert_conveyor_node<fix_void<return_type<Func, T>>, fix_void<T>,
+ Func, ErrorFunc>>(
+ std::move(node_), std::move(func), std::move(error_func));
+
+ return conveyor<remove_error_or<return_type<Func, T>>>::to_conveyor(
+ std::move(conversion_node));
+}
+
+template <typename T> conveyor<T> conveyor<T>::buffer(size_t size) {
+ SAW_ASSERT(node_) { return conveyor<T>{own<conveyor_node>{nullptr}}; }
+ conveyor_storage *storage = node_->next_storage();
+ SAW_ASSERT(storage) { return conveyor<T>{own<conveyor_node>{nullptr}}; }
+
+ own<queue_buffer_conveyor_node<fix_void<T>>> storage_node =
+ heap<queue_buffer_conveyor_node<fix_void<T>>>(std::move(node_), size);
+
+ conveyor_storage *storage_ptr =
+ static_cast<conveyor_storage *>(storage_node.get());
+
+ storage->set_parent(storage_ptr);
+ return conveyor<T>{std::move(storage_node)};
+}
+
+template <typename T>
+template <typename... Args>
+conveyor<T> conveyor<T>::attach(Args &&...args) {
+ own<attach_conveyor_node<Args...>> attach_node =
+ heap<attach_conveyor_node<Args...>>(std::move(node_),
+ std::move(args...));
+ return conveyor<T>{std::move(attach_node)};
+}
+
+template <typename T>
+std::pair<conveyor<T>, merge_conveyor<T>> conveyor<T>::merge() {
+ our<merge_conveyor_node_data<T>> data =
+ share<merge_conveyor_node_data<T>>();
+
+ own<merge_conveyor_node<T>> merge_node = heap<merge_conveyor_node<T>>(data);
+
+ SAW_ASSERT(node_) {
+ return std::make_pair(conveyor<T>{own<conveyor_node>{nullptr}},
+ merge_conveyor<T>{});
+ }
+ conveyor_storage *storage = node_->next_storage();
+ SAW_ASSERT(storage) {
+ return std::make_pair(conveyor<T>{own<conveyor_node>{nullptr}},
+ merge_conveyor<T>{});
+ }
+
+ data->attach(conveyor<T>::to_conveyor(std::move(node_)));
+
+ merge_conveyor<T> node_ref{data};
+
+ return std::make_pair(conveyor<T>{std::move(merge_node)},
+ std::move(node_ref));
+}
+
+template <>
+template <typename ErrorFunc>
+conveyor_sink conveyor<void>::sink(ErrorFunc &&error_func) {
+ conveyor_storage *storage = node_->next_storage();
+ SAW_ASSERT(storage) { return conveyor_sink{}; }
+
+ own<sink_conveyor_node> sink_node =
+ heap<sink_conveyor_node>(std::move(node_));
+ conveyor_storage *storage_ptr =
+ static_cast<conveyor_storage *>(sink_node.get());
+
+ storage->set_parent(storage_ptr);
+
+ return conveyor_sink{std::move(sink_node)};
+}
+
+void detach_conveyor(conveyor<void> &&conveyor);
+
+template <typename T>
+template <typename ErrorFunc>
+void conveyor<T>::detach(ErrorFunc &&func) {
+ detach_conveyor(std::move(then([](T &&) {}, std::move(func))));
+}
+
+template <>
+template <typename ErrorFunc>
+void conveyor<void>::detach(ErrorFunc &&func) {
+ detach_conveyor(std::move(then([]() {}, std::move(func))));
+}
+
+template <typename T>
+conveyor<T> conveyor<T>::to_conveyor(own<conveyor_node> node) {
+ return conveyor<T>{std::move(node)};
+}
+
+template <typename T>
+own<conveyor_node> conveyor<T>::from_conveyor(conveyor<T> conveyor) {
+ return std::move(conveyor.node_);
+}
+
+template <typename T> error_or<fix_void<T>> conveyor<T>::take() {
+ SAW_ASSERT(node_) {
+ return error_or<fix_void<T>>{
+ critical_error("conveyor in invalid state")};
+ }
+ conveyor_storage *storage = node_->next_storage();
+ if (storage) {
+ if (storage->queued() > 0) {
+ error_or<fix_void<T>> result;
+ node_->get_result(result);
+ return result;
+ } else {
+ return error_or<fix_void<T>>{
+ recoverable_error("conveyor buffer has no elements")};
+ }
+ } else {
+ return error_or<fix_void<T>>{
+ critical_error("conveyor node has no child storage")};
+ }
+}
+
+template <typename T> conveyor_and_feeder<T> new_conveyor_and_feeder() {
+ own<adapt_conveyor_feeder<fix_void<T>>> feeder =
+ heap<adapt_conveyor_feeder<fix_void<T>>>();
+ own<adapt_conveyor_node<fix_void<T>>> node =
+ heap<adapt_conveyor_node<fix_void<T>>>();
+
+ feeder->set_feedee(node.get());
+ node->set_feeder(feeder.get());
+
+ return conveyor_and_feeder<T>{std::move(feeder),
+ conveyor<T>::to_conveyor(std::move(node))};
+}
+
+// QueueBuffer
+template <typename T> void queue_buffer_conveyor_node<T>::fire() {
+ if (child_mixin_.child) {
+ if (!storage_.empty()) {
+ if (storage_.front().is_error()) {
+ if (storage_.front().error().is_critical()) {
+ child_mixin_.child = nullptr;
+ }
+ }
+ }
+ }
+
+ bool has_space_before_fire = space() > 0;
+
+ if (parent_) {
+ parent_->child_has_fired();
+ if (!storage_.empty() && parent_->space() > 0) {
+ arm_later();
+ }
+ }
+
+ if (!child_mixin_.child) {
+ while (!storage_.empty()) {
+ storage_.pop();
+ }
+ return;
+ }
+
+ conveyor_storage *ch_storage = child_mixin_.child->next_storage();
+ if (ch_storage && !has_space_before_fire) {
+ ch_storage->parent_has_fired();
+ }
+}
+
+template <typename T>
+void queue_buffer_conveyor_node<T>::get_result(error_or_value &eov) noexcept {
+ error_or<T> &err_or_val = eov.as<T>();
+ err_or_val = std::move(storage_.front());
+ storage_.pop();
+}
+
+template <typename T> size_t queue_buffer_conveyor_node<T>::space() const {
+ return max_store_ - storage_.size();
+}
+
+template <typename T> size_t queue_buffer_conveyor_node<T>::queued() const {
+ return storage_.size();
+}
+
+template <typename T> void queue_buffer_conveyor_node<T>::child_has_fired() {
+ if (child_mixin_.child && storage_.size() < max_store_) {
+ error_or<T> eov;
+ child_mixin_.child->get_result(eov);
+
+ if (eov.is_error()) {
+ if (eov.error().is_critical()) {
+ }
+ }
+
+ storage_.push(std::move(eov));
+ if (!is_armed()) {
+ arm_later();
+ }
+ }
+}
+
+template <typename T> void queue_buffer_conveyor_node<T>::parent_has_fired() {
+ SAW_ASSERT(parent_) { return; }
+
+ if (parent_->space() == 0) {
+ return;
+ }
+
+ if (queued() > 0) {
+ arm_later();
+ }
+}
+
+template <typename T>
+immediate_conveyor_node<T>::immediate_conveyor_node(fix_void<T> &&val)
+ : value_{std::move(val)}, retrieved_{0} {}
+
+template <typename T>
+immediate_conveyor_node<T>::immediate_conveyor_node(error &&error)
+ : value_{std::move(error)}, retrieved_{0} {}
+
+template <typename T> size_t immediate_conveyor_node<T>::space() const {
+ return 0;
+}
+
+template <typename T> size_t immediate_conveyor_node<T>::queued() const {
+ return retrieved_ > 1 ? 0 : 1;
+}
+
+template <typename T> void immediate_conveyor_node<T>::child_has_fired() {
+ // Impossible case
+ assert(false);
+}
+
+template <typename T> void immediate_conveyor_node<T>::parent_has_fired() {
+ SAW_ASSERT(parent_) { return; }
+ assert(parent_->space() > 0);
+
+ if (queued() > 0) {
+ arm_next();
+ }
+}
+
+template <typename T> void immediate_conveyor_node<T>::fire() {
+
+ if (parent_) {
+ parent_->child_has_fired();
+ if (queued() > 0 && parent_->space() > 0) {
+ arm_last();
+ }
+ }
+}
+
+template <typename T>
+merge_conveyor<T>::merge_conveyor(lent<merge_conveyor_node_data<T>> d)
+ : data_{std::move(d)} {}
+
+template <typename T> merge_conveyor<T>::~merge_conveyor() {}
+
+template <typename T> void merge_conveyor<T>::attach(conveyor<T> conveyor) {
+ auto sp = data_.lock();
+ SAW_ASSERT(sp) { return; }
+
+ sp->attach(std::move(conveyor));
+}
+
+template <typename T>
+merge_conveyor_node<T>::merge_conveyor_node(our<merge_conveyor_node_data<T>> d)
+ : data_{d} {
+ SAW_ASSERT(data_) { return; }
+
+ data_->merger = this;
+}
+
+template <typename T> merge_conveyor_node<T>::~merge_conveyor_node() {}
+
+template <typename T>
+error_or<own<conveyor_node>>
+merge_conveyor_node<T>::swap_child(own<conveyor_node> &&swapee_) noexcept {
+ (void)swapee_;
+ return critical_error(
+ "merge_conveyor_node<T>::appendage should block calls to this class");
+}
+
+template <typename T>
+void merge_conveyor_node<T>::get_result(error_or_value &eov) noexcept {
+ error_or<fix_void<T>> &err_or_val = eov.as<fix_void<T>>();
+
+ SAW_ASSERT(data_) { return; }
+
+ /// @todo search appendages for result
+
+ auto &appendages = data_->appendages;
+ next_appendage_ = std::min(appendages.size(), next_appendage_);
+
+ for (size_t i = next_appendage_; i < appendages.size(); ++i) {
+ if (appendages[i]->queued() > 0) {
+ err_or_val = std::move(appendages[i]->error_or_value_.value());
+ appendages[i]->error_or_value_ = std::nullopt;
+ next_appendage_ = i + 1;
+ return;
+ }
+ }
+ for (size_t i = 0; i < next_appendage_; ++i) {
+ if (appendages[i]->queued() > 0) {
+ err_or_val = std::move(appendages[i]->error_or_value_.value());
+ appendages[i]->error_or_value_ = std::nullopt;
+ next_appendage_ = i + 1;
+ return;
+ }
+ }
+
+ err_or_val = critical_error("No value in Merge appendages");
+}
+
+template <typename T> void merge_conveyor_node<T>::fire() {
+ SAW_ASSERT(queued() > 0) { return; }
+
+ if (parent_) {
+ parent_->child_has_fired();
+
+ if (queued() > 0 && parent_->space() > 0) {
+ arm_later();
+ }
+ }
+}
+
+template <typename T> size_t merge_conveyor_node<T>::space() const { return 0; }
+
+template <typename T> size_t merge_conveyor_node<T>::queued() const {
+ SAW_ASSERT(data_) { return 0; }
+
+ size_t queue_count = 0;
+
+ for (auto &iter : data_->appendages) {
+ queue_count += iter->queued();
+ }
+
+ return queue_count;
+}
+
+template <typename T> void merge_conveyor_node<T>::child_has_fired() {
+ /// This can never happen
+ assert(false);
+}
+
+template <typename T> void merge_conveyor_node<T>::parent_has_fired() {
+ SAW_ASSERT(parent_) { return; }
+ if (queued() > 0) {
+ if (parent_->space() > 0) {
+ arm_later();
+ }
+ }
+}
+
+/**
+ * merge_conveyor_node<T>::Apendage
+ */
+
+template <typename T>
+error_or<own<conveyor_node>>
+merge_conveyor_node<T>::appendage::swap_child(own<conveyor_node> &&swapee_) {
+ own<conveyor_node> old_child = std::move(child);
+
+ child = std::move(swapee_);
+
+ // This case should never happen
+ SAW_ASSERT(old_child) { return critical_error("No child exists"); }
+
+ return old_child;
+}
+
+template <typename T>
+void merge_conveyor_node<T>::appendage::get_result(error_or_value &eov) {
+ error_or<fix_void<T>> &err_or_val = eov.as<fix_void<T>>();
+
+ SAW_ASSERT(queued() > 0) {
+ err_or_val =
+ critical_error("No element queued in Merge appendage Node");
+ return;
+ }
+
+ err_or_val = std::move(error_or_value_.value());
+ error_or_value_ = std::nullopt;
+}
+
+template <typename T> size_t merge_conveyor_node<T>::appendage::space() const {
+ SAW_ASSERT(merger) { return 0; }
+
+ if (error_or_value_.has_value()) {
+ return 0;
+ }
+
+ return 1;
+}
+
+template <typename T> size_t merge_conveyor_node<T>::appendage::queued() const {
+ SAW_ASSERT(merger) { return 0; }
+
+ if (error_or_value_.has_value()) {
+ return 1;
+ }
+
+ return 0;
+}
+
+/// @todo delete this function. Replaced by the regular get_result
+template <typename T>
+void merge_conveyor_node<T>::appendage::get_appendage_result(
+ error_or_value &eov) {
+ error_or<fix_void<T>> &err_or_val = eov.as<fix_void<T>>();
+
+ SAW_ASSERT(queued() > 0) {
+ err_or_val =
+ critical_error("No element queued in Merge appendage Node");
+ return;
+ }
+
+ err_or_val = std::move(error_or_value_.value());
+ error_or_value_ = std::nullopt;
+}
+
+template <typename T>
+void merge_conveyor_node<T>::appendage::child_has_fired() {
+ SAW_ASSERT(!error_or_value_.has_value()) { return; }
+ error_or<fix_void<T>> eov;
+ child->get_result(eov);
+
+ error_or_value_ = std::move(eov);
+
+ if (!merger->is_armed()) {
+ merger->arm_later();
+ }
+}
+
+template <typename T>
+void merge_conveyor_node<T>::appendage::parent_has_fired() {
+ conveyor_storage *child_storage = child->next_storage();
+ if (child_storage) {
+ child_storage->parent_has_fired();
+ }
+}
+
+template <typename T>
+void merge_conveyor_node<T>::appendage::set_parent(conveyor_storage *par) {
+ SAW_ASSERT(merger) { return; }
+
+ SAW_ASSERT(child) { return; }
+
+ parent_ = par;
+}
+
+template <typename T>
+void merge_conveyor_node_data<T>::attach(conveyor<T> conv) {
+ auto nas = conveyor<T>::from_conveyor(std::move(conv));
+ SAW_ASSERT(nas) { return; }
+ conveyor_storage *storage = nas->next_storage();
+ SAW_ASSERT(storage) { return; }
+
+ auto merge_node_appendage =
+ heap<typename merge_conveyor_node<T>::appendage>(std::move(nas),
+ *merger);
+ auto merge_node_appendage_ptr = merge_node_appendage.get();
+
+ storage->set_parent(merge_node_appendage.get());
+
+ SAW_ASSERT(merger) { return; }
+
+ conveyor_storage *mrg_storage = merger->next_storage();
+ SAW_ASSERT(mrg_storage) { return; }
+
+ merge_node_appendage->set_parent(mrg_storage);
+
+ appendages.push_back(std::move(merge_node_appendage));
+
+ /// @todo return this. necessary? maybe for the weird linking setup
+ /// maybe not
+ // return merge_node_appendage_ptr;
+}
+
+template <typename T>
+void merge_conveyor_node_data<T>::governing_node_destroyed() {
+ appendages.clear();
+ merger = nullptr;
+}
+
+template <typename T> adapt_conveyor_feeder<T>::~adapt_conveyor_feeder() {
+ if (feedee_) {
+ feedee_->set_feeder(nullptr);
+ feedee_ = nullptr;
+ }
+}
+
+template <typename T>
+void adapt_conveyor_feeder<T>::set_feedee(adapt_conveyor_node<T> *feedee_p) {
+ feedee_ = feedee_p;
+}
+
+template <typename T> void adapt_conveyor_feeder<T>::feed(T &&value) {
+ if (feedee_) {
+ feedee_->feed(std::move(value));
+ }
+}
+
+template <typename T> void adapt_conveyor_feeder<T>::fail(error &&error) {
+ if (feedee_) {
+ feedee_->fail(std::move(error));
+ }
+}
+
+template <typename T> size_t adapt_conveyor_feeder<T>::queued() const {
+ if (feedee_) {
+ return feedee_->queued();
+ }
+ return 0;
+}
+
+template <typename T> size_t adapt_conveyor_feeder<T>::space() const {
+ if (feedee_) {
+ return feedee_->space();
+ }
+ return 0;
+}
+
+template <typename T>
+error adapt_conveyor_feeder<T>::swap(conveyor<T> &&conv) noexcept {
+ SAW_ASSERT(feedee_) { return critical_error("No feedee connected"); }
+
+ auto node = conveyor<T>::from_conveyor(std::move(conv));
+
+ feedee_->swap_child(std::move(node));
+
+ return no_error();
+}
+
+template <typename T>
+adapt_conveyor_node<T>::adapt_conveyor_node() : conveyor_event_storage{} {}
+
+template <typename T> adapt_conveyor_node<T>::~adapt_conveyor_node() {
+ if (feeder_) {
+ feeder_->set_feedee(nullptr);
+ feeder_ = nullptr;
+ }
+}
+
+template <typename T>
+error_or<own<conveyor_node>>
+adapt_conveyor_node<T>::swap_child(own<conveyor_node> &&swapee) noexcept {
+ // This should return the owning pointer of this instance
+ auto myself_err = parent_node_.swap_child_of_parent(std::move(swapee));
+
+ if (myself_err.is_error()) {
+ return myself_err;
+ }
+
+ auto &myself = myself_err.value();
+
+ assert(myself.get() == this);
+
+ return myself_err;
+}
+
+template <typename T>
+conveyor_storage *adapt_conveyor_node<T>::next_storage() noexcept {
+ return static_cast<conveyor_storage *>(this);
+}
+
+template <typename T>
+void adapt_conveyor_node<T>::notify_parent_attached(
+ conveyor_node &par) noexcept {
+ parent_node_.change_parent(&par);
+}
+
+template <typename T>
+void adapt_conveyor_node<T>::set_feeder(adapt_conveyor_feeder<T> *feeder_p) {
+ feeder_ = feeder_p;
+}
+
+template <typename T> void adapt_conveyor_node<T>::feed(T &&value) {
+ storage_.push(std::move(value));
+ arm_next();
+}
+
+template <typename T> void adapt_conveyor_node<T>::fail(error &&error) {
+ storage_.push(std::move(error));
+ arm_next();
+}
+
+template <typename T> size_t adapt_conveyor_node<T>::queued() const {
+ return storage_.size();
+}
+
+template <typename T> size_t adapt_conveyor_node<T>::space() const {
+ return std::numeric_limits<size_t>::max() - storage_.size();
+}
+
+template <typename T>
+void adapt_conveyor_node<T>::get_result(error_or_value &err_or_val) {
+ if (!storage_.empty()) {
+ err_or_val.as<T>() = std::move(storage_.front());
+ storage_.pop();
+ } else {
+ err_or_val.as<T>() = critical_error(
+ "Signal for retrieval of storage sent even though no "
+ "data is present");
+ }
+}
+
+template <typename T> void adapt_conveyor_node<T>::child_has_fired() {
+ // Adapt node has no children
+ assert(false);
+}
+
+template <typename T> void adapt_conveyor_node<T>::parent_has_fired() {
+ SAW_ASSERT(parent_) { return; }
+
+ if (parent_->space() == 0) {
+ return;
+ }
+}
+
+template <typename T> void adapt_conveyor_node<T>::fire() {
+ if (parent_) {
+ parent_->child_has_fired();
+
+ if (storage_.size() > 0) {
+ arm_later();
+ }
+ }
+}
+
+template <typename T> one_time_conveyor_feeder<T>::~one_time_conveyor_feeder() {
+ if (feedee_) {
+ feedee_->set_feeder(nullptr);
+ feedee_ = nullptr;
+ }
+}
+
+template <typename T>
+void one_time_conveyor_feeder<T>::set_feedee(
+ one_time_conveyor_node<T> *feedee_p) {
+ feedee_ = feedee_p;
+}
+
+template <typename T> void one_time_conveyor_feeder<T>::feed(T &&value) {
+ if (feedee_) {
+ feedee_->feed(std::move(value));
+ }
+}
+
+template <typename T> void one_time_conveyor_feeder<T>::fail(error &&error) {
+ if (feedee_) {
+ feedee_->fail(std::move(error));
+ }
+}
+
+template <typename T> size_t one_time_conveyor_feeder<T>::queued() const {
+ if (feedee_) {
+ return feedee_->queued();
+ }
+ return 0;
+}
+
+template <typename T> size_t one_time_conveyor_feeder<T>::space() const {
+ if (feedee_) {
+ return feedee_->space();
+ }
+ return 0;
+}
+
+template <typename T> one_time_conveyor_node<T>::~one_time_conveyor_node() {
+ if (feeder_) {
+ feeder_->set_feedee(nullptr);
+ feeder_ = nullptr;
+ }
+}
+
+template <typename T>
+void one_time_conveyor_node<T>::set_feeder(
+ one_time_conveyor_feeder<T> *feeder_p) {
+ feeder_ = feeder_p;
+}
+
+template <typename T> void one_time_conveyor_node<T>::feed(T &&value) {
+ storage_ = std::move(value);
+ arm_next();
+}
+
+template <typename T> void one_time_conveyor_node<T>::fail(error &&error) {
+ storage_ = std::move(error);
+ arm_next();
+}
+
+template <typename T> size_t one_time_conveyor_node<T>::queued() const {
+ return storage_.has_value() ? 1 : 0;
+}
+
+template <typename T> size_t one_time_conveyor_node<T>::space() const {
+ return passed_ ? 0 : 1;
+}
+
+template <typename T>
+void one_time_conveyor_node<T>::get_result(error_or_value &err_or_val) {
+ if (storage_.has_value()) {
+ err_or_val.as<T>() = std::move(storage_.value());
+ storage_ = std::nullopt;
+ } else {
+ err_or_val.as<T>() = critical_error(
+ "Signal for retrieval of storage sent even though no "
+ "data is present");
+ }
+}
+
+template <typename T> void one_time_conveyor_node<T>::fire() {
+ if (parent_) {
+ parent_->child_has_fired();
+ }
+}
+
+} // namespace saw