summaryrefslogtreecommitdiff
path: root/c++/async
diff options
context:
space:
mode:
authorClaudius "keldu" Holeksa <mail@keldu.de>2023-12-04 12:18:14 +0100
committerClaudius "keldu" Holeksa <mail@keldu.de>2023-12-04 12:18:14 +0100
commita14896f9ed209dd3f9597722e5a5697bd7dbf531 (patch)
tree089ca5cbbd206d1921f8f6b53292f5bc1902ca5c /c++/async
parent84ecdcbca9e55b1f57fbb832e12ff4fdbb86e7c9 (diff)
meta: Renamed folder containing source
Diffstat (limited to 'c++/async')
-rw-r--r--c++/async/.nix/derivation.nix28
-rw-r--r--c++/async/SConscript38
-rw-r--r--c++/async/SConstruct66
-rw-r--r--c++/async/async.cpp419
-rw-r--r--c++/async/async.h1023
-rw-r--r--c++/async/async.tmpl.h767
6 files changed, 0 insertions, 2341 deletions
diff --git a/c++/async/.nix/derivation.nix b/c++/async/.nix/derivation.nix
deleted file mode 100644
index aad258f..0000000
--- a/c++/async/.nix/derivation.nix
+++ /dev/null
@@ -1,28 +0,0 @@
-{ lib
-, stdenv
-, scons
-, clang-tools
-, version
-, forstio
-}:
-
-let
-
-in stdenv.mkDerivation {
- pname = "forstio-async";
- inherit version;
- src = ./..;
-
- enableParallelBuilding = true;
-
- nativeBuildInputs = [
- scons
- clang-tools
- ];
-
- buildInputs = [
- forstio.core
- ];
-
- outputs = ["out" "dev"];
-}
diff --git a/c++/async/SConscript b/c++/async/SConscript
deleted file mode 100644
index 69f8950..0000000
--- a/c++/async/SConscript
+++ /dev/null
@@ -1,38 +0,0 @@
-#!/bin/false
-
-import os
-import os.path
-import glob
-
-
-Import('env')
-
-dir_path = Dir('.').abspath
-
-# Environment for base library
-async_env = env.Clone();
-
-async_env.sources = sorted(glob.glob(dir_path + "/*.cpp"))
-async_env.headers = sorted(glob.glob(dir_path + "/*.h"))
-
-env.sources += async_env.sources;
-env.headers += async_env.headers;
-
-## Shared lib
-objects_shared = []
-async_env.add_source_files(objects_shared, async_env.sources, shared=True);
-async_env.library_shared = async_env.SharedLibrary('#build/forstio-async', [objects_shared]);
-
-## Static lib
-objects_static = []
-async_env.add_source_files(objects_static, async_env.sources, shared=False);
-async_env.library_static = async_env.StaticLibrary('#build/forstio-async', [objects_static]);
-
-# Set Alias
-env.Alias('library_async', [async_env.library_shared, async_env.library_static]);
-
-env.targets += ['library_async'];
-
-# Install
-env.Install('$prefix/lib/', [async_env.library_shared, async_env.library_static]);
-env.Install('$prefix/include/forstio/async/', [async_env.headers]);
diff --git a/c++/async/SConstruct b/c++/async/SConstruct
deleted file mode 100644
index 0d7b7c6..0000000
--- a/c++/async/SConstruct
+++ /dev/null
@@ -1,66 +0,0 @@
-#!/usr/bin/env python3
-
-import sys
-import os
-import os.path
-import glob
-import re
-
-
-if sys.version_info < (3,):
- def isbasestring(s):
- return isinstance(s,basestring)
-else:
- def isbasestring(s):
- return isinstance(s, (str,bytes))
-
-def add_kel_source_files(self, sources, filetype, lib_env=None, shared=False, target_post=""):
-
- if isbasestring(filetype):
- dir_path = self.Dir('.').abspath
- filetype = sorted(glob.glob(dir_path+"/"+filetype))
-
- for path in filetype:
- target_name = re.sub( r'(.*?)(\.cpp|\.c\+\+)', r'\1' + target_post, path )
- if shared:
- target_name+='.os'
- sources.append( self.SharedObject( target=target_name, source=path ) )
- else:
- target_name+='.o'
- sources.append( self.StaticObject( target=target_name, source=path ) )
- pass
-
-def isAbsolutePath(key, dirname, env):
- assert os.path.isabs(dirname), "%r must have absolute path syntax" % (key,)
-
-env_vars = Variables(
- args=ARGUMENTS
-)
-
-env_vars.Add('prefix',
- help='Installation target location of build results and headers',
- default='/usr/local/',
- validator=isAbsolutePath
-)
-
-env=Environment(ENV=os.environ, variables=env_vars, CPPPATH=[],
- CPPDEFINES=['SAW_UNIX'],
- CXXFLAGS=['-std=c++20','-g','-Wall','-Wextra'],
- LIBS=['forstio-core'])
-env.__class__.add_source_files = add_kel_source_files
-env.Tool('compilation_db');
-env.cdb = env.CompilationDatabase('compile_commands.json');
-
-env.objects = [];
-env.sources = [];
-env.headers = [];
-env.targets = [];
-
-Export('env')
-SConscript('SConscript')
-
-env.Alias('cdb', env.cdb);
-env.Alias('all', [env.targets]);
-env.Default('all');
-
-env.Alias('install', '$prefix')
diff --git a/c++/async/async.cpp b/c++/async/async.cpp
deleted file mode 100644
index 360e455..0000000
--- a/c++/async/async.cpp
+++ /dev/null
@@ -1,419 +0,0 @@
-#include "async.h"
-#include <forstio/core/common.h>
-#include <forstio/core/error.h>
-
-#include <algorithm>
-#include <cassert>
-
-namespace saw {
-namespace {
-thread_local event_loop *local_loop = nullptr;
-
-event_loop &current_event_loop() {
- event_loop *loop = local_loop;
- assert(loop);
- return *loop;
-}
-} // namespace
-
-conveyor_node::conveyor_node() {}
-
-conveyor_node_with_child_mixin::conveyor_node_with_child_mixin(
- own<conveyor_node> &&child_, conveyor_node &owner)
- : child{std::move(child_)} {
- assert(child);
-
- child->notify_parent_attached(owner);
-}
-
-error_or<own<conveyor_node>>
-conveyor_node_with_child_mixin::swap_child(own<conveyor_node> &&swapee) {
- SAW_ASSERT(child) {
- return make_error<err::invalid_state>("Child should exist if this function is called");
- }
- own<conveyor_node> old_child = std::move(child);
-
- /**
- * We need the parent of the old_child's next storage
- */
- conveyor_storage *old_storage = old_child->next_storage();
- conveyor_storage *old_storage_parent =
- old_storage ? old_storage->get_parent() : nullptr;
-
- /**
- * Swap in the new child
- */
- if (swapee) {
- child = std::move(swapee);
-
- /**
- * Then we need to set the new child's storage parent since the next
- * storage has a nullptr set And if the old_storage_parent is a nullptr,
- * then it doesn't matter. So we don't check for it
- */
- conveyor_storage *swapee_storage = child->next_storage();
- if (swapee_storage) {
- swapee_storage->set_parent(old_storage_parent);
- }
- }
-
- return old_child;
-}
-
-conveyor_storage::conveyor_storage() {}
-
-conveyor_storage::~conveyor_storage() {}
-
-conveyor_storage *conveyor_storage::get_parent() const { return parent_; }
-
-void conveyor_event_storage::set_parent(conveyor_storage *p) {
- /*
- * parent check isn't needed, but is used
- * for the assert, because the storage should
- * be armed if there was an element present
- * and a valid parent
- */
- if (/*!parent && */ p && !is_armed() && queued() > 0) {
- assert(!parent_);
- if (p->space() > 0) {
- arm_later();
- }
- }
-
- parent_ = p;
-}
-
-conveyor_event_storage::conveyor_event_storage() : conveyor_storage{} {}
-
-conveyor_base::conveyor_base(own<conveyor_node> &&node_p)
- : node_{std::move(node_p)} {}
-
-error propagate_error::operator()(const error &error) const {
- return error.copy_error();
-}
-
-error propagate_error::operator()(error &&err) { return std::move(err); }
-
-event::event() : event(current_event_loop()) {}
-
-event::event(event_loop &loop) : loop_{loop} {}
-
-event::~event() { disarm(); }
-
-void event::arm_next() {
- assert(&loop_ == local_loop);
- if (prev_ == nullptr) {
- // Push the next_insert_point back by one
- // and inserts itself before that
- next_ = *loop_.next_insert_point_;
- prev_ = loop_.next_insert_point_;
- *prev_ = this;
- if (next_) {
- next_->prev_ = &next_;
- }
-
- // Set the new insertion ptr location to next
- loop_.next_insert_point_ = &next_;
-
- // Pushes back the later insert point if it was pointing at the
- // previous event
- if (loop_.later_insert_point_ == prev_) {
- loop_.later_insert_point_ = &next_;
- }
-
- // If tail_ points at the same location then
- // we are at the end and have to update tail_ then.
- // Technically should be possible by checking if
- // next is a `nullptr`
- if (loop_.tail_ == prev_) {
- loop_.tail_ = &next_;
- }
-
- loop_.set_runnable(true);
- }
-}
-
-void event::arm_later() {
- assert(&loop_ == local_loop);
-
- if (prev_ == nullptr) {
- next_ = *loop_.later_insert_point_;
- prev_ = loop_.later_insert_point_;
- *prev_ = this;
- if (next_) {
- next_->prev_ = &next_;
- }
-
- loop_.later_insert_point_ = &next_;
- if (loop_.tail_ == prev_) {
- loop_.tail_ = &next_;
- }
-
- loop_.set_runnable(true);
- }
-}
-
-void event::arm_last() {
- assert(&loop_ == local_loop);
-
- if (prev_ == nullptr) {
- next_ = *loop_.later_insert_point_;
- prev_ = loop_.later_insert_point_;
- *prev_ = this;
- if (next_) {
- next_->prev_ = &next_;
- }
-
- if (loop_.tail_ == prev_) {
- loop_.tail_ = &next_;
- }
-
- loop_.set_runnable(true);
- }
-}
-
-void event::disarm() {
- if (prev_ != nullptr) {
- if (loop_.tail_ == &next_) {
- loop_.tail_ = prev_;
- }
-
- if (loop_.next_insert_point_ == &next_) {
- loop_.next_insert_point_ = prev_;
- }
-
- *prev_ = next_;
- if (next_) {
- next_->prev_ = prev_;
- }
-
- prev_ = nullptr;
- next_ = nullptr;
- }
-}
-
-bool event::is_armed() const { return prev_ != nullptr; }
-
-conveyor_sink::conveyor_sink() : node_{nullptr} {}
-
-conveyor_sink::conveyor_sink(own<conveyor_node> &&node_p)
- : node_{std::move(node_p)} {}
-
-void event_loop::set_runnable(bool runnable) { is_runnable_ = runnable; }
-
-event_loop::event_loop() {}
-
-event_loop::event_loop(own<class event_port> &&ep)
- : event_port_{std::move(ep)} {}
-
-event_loop::~event_loop() { assert(local_loop != this); }
-
-void event_loop::enter_scope() {
- assert(!local_loop);
- local_loop = this;
-}
-
-void event_loop::leave_scope() {
- assert(local_loop == this);
- local_loop = nullptr;
-}
-
-bool event_loop::turn_loop() {
- size_t turn_step = 0;
- while (head_ && turn_step < 65536) {
- if (!turn()) {
- return false;
- }
- ++turn_step;
- }
- return true;
-}
-
-bool event_loop::turn() {
- event *event = head_;
-
- if (!event) {
- return false;
- }
-
- head_ = event->next_;
- if (head_) {
- head_->prev_ = &head_;
- }
-
- next_insert_point_ = &head_;
- if (later_insert_point_ == &event->next_) {
- later_insert_point_ = &head_;
- }
- if (tail_ == &event->next_) {
- tail_ = &head_;
- }
-
- event->next_ = nullptr;
- event->prev_ = nullptr;
-
- next_insert_point_ = &head_;
-
- event->fire();
-
- return true;
-}
-
-bool event_loop::wait(const std::chrono::steady_clock::duration &duration) {
- if (event_port_) {
- event_port_->wait(duration);
- }
-
- return turn_loop();
-}
-
-bool event_loop::wait(const std::chrono::steady_clock::time_point &time_point) {
- if (event_port_) {
- event_port_->wait(time_point);
- }
-
- return turn_loop();
-}
-
-bool event_loop::wait() {
- if (event_port_) {
- event_port_->wait();
- }
-
- return turn_loop();
-}
-
-bool event_loop::poll() {
- if (event_port_) {
- event_port_->poll();
- }
-
- return turn_loop();
-}
-
-event_port *event_loop::get_event_port() { return event_port_.get(); }
-
-conveyor_sink_set &event_loop::daemon() {
- if (!daemon_sink_) {
- daemon_sink_ = heap<conveyor_sink_set>();
- }
- return *daemon_sink_;
-}
-
-wait_scope::wait_scope(event_loop &loop) : loop_{loop} { loop_.enter_scope(); }
-
-wait_scope::~wait_scope() { loop_.leave_scope(); }
-
-void wait_scope::wait() { loop_.wait(); }
-
-void wait_scope::wait(const std::chrono::steady_clock::duration &duration) {
- loop_.wait(duration);
-}
-
-void wait_scope::wait(const std::chrono::steady_clock::time_point &time_point) {
- loop_.wait(time_point);
-}
-
-void wait_scope::poll() { loop_.poll(); }
-
-error_or<own<conveyor_node>>
-convert_conveyor_node_base::swap_child(own<conveyor_node> &&swapee) noexcept {
- return child_mixin_.swap_child(std::move(swapee));
-}
-
-conveyor_storage *convert_conveyor_node_base::next_storage() noexcept {
- if (!child_mixin_.child) {
- return nullptr;
- }
- return child_mixin_.child->next_storage();
-}
-
-immediate_conveyor_node_base::immediate_conveyor_node_base()
- : conveyor_event_storage{} {}
-
-merge_conveyor_node_base::merge_conveyor_node_base()
- : conveyor_event_storage{} {}
-
-error_or<own<conveyor_node>> queue_buffer_conveyor_node_base::swap_child(
- own<conveyor_node> &&swapee_) noexcept {
- return child_mixin_.swap_child(std::move(swapee_));
-}
-
-void conveyor_sink_set::destroy_sink_conveyor_node(conveyor_node &node) {
- if (!is_armed()) {
- arm_last();
- }
-
- delete_nodes_.push(&node);
-}
-
-void conveyor_sink_set::fail(error &&error) {
- /// @todo call error_handler
-}
-
-conveyor_sink_set::conveyor_sink_set(event_loop &event_loop)
- : event{event_loop} {}
-
-void conveyor_sink_set::add(conveyor<void> &&sink) {
- auto nas = conveyor<void>::from_conveyor(std::move(sink));
- SAW_ASSERT(nas) { return; }
- conveyor_storage *storage = nas->next_storage();
-
- own<sink_conveyor_node> sink_node = nullptr;
- try {
- sink_node = heap<sink_conveyor_node>(std::move(nas), *this);
- } catch (std::bad_alloc &) {
- return;
- }
- if (storage) {
- storage->set_parent(sink_node.get());
- }
-
- sink_nodes_.emplace_back(std::move(sink_node));
-}
-
-void conveyor_sink_set::fire() {
- while (!delete_nodes_.empty()) {
- conveyor_node *node = delete_nodes_.front();
- /*auto erased = */ std::remove_if(sink_nodes_.begin(),
- sink_nodes_.end(),
- [node](own<conveyor_node> &element) {
- return node == element.get();
- });
- delete_nodes_.pop();
- }
-}
-
-convert_conveyor_node_base::convert_conveyor_node_base(own<conveyor_node> &&dep)
- : child_mixin_{std::move(dep), *this} {}
-
-void convert_conveyor_node_base::get_result(error_or_value &err_or_val) {
- get_impl(err_or_val);
-}
-
-void attach_conveyor_node_base::get_result(
- error_or_value &err_or_val) noexcept {
- if (child_mixin_.child) {
- child_mixin_.child->get_result(err_or_val);
- }
-}
-
-error_or<own<conveyor_node>>
-attach_conveyor_node_base::swap_child(own<conveyor_node> &&swapee_) noexcept {
- return child_mixin_.swap_child(std::move(swapee_));
-}
-
-conveyor_storage *attach_conveyor_node_base::next_storage() noexcept {
- if (!child_mixin_.child) {
- return nullptr;
- }
-
- return child_mixin_.child->next_storage();
-}
-
-void detach_conveyor(conveyor<void> &&conveyor) {
- event_loop &loop = current_event_loop();
- conveyor_sink_set &sink = loop.daemon();
- sink.add(std::move(conveyor));
-}
-} // namespace saw
diff --git a/c++/async/async.h b/c++/async/async.h
deleted file mode 100644
index 8190be0..0000000
--- a/c++/async/async.h
+++ /dev/null
@@ -1,1023 +0,0 @@
-#pragma once
-
-#include <forstio/core/common.h>
-#include <forstio/core/error.h>
-
-#include <chrono>
-#include <functional>
-#include <limits>
-#include <list>
-#include <queue>
-#include <type_traits>
-
-namespace saw {
-class conveyor_storage;
-class conveyor_node {
-public:
- conveyor_node();
- virtual ~conveyor_node() = default;
-
- /**
- * Internal method to retrieve results from children
- */
- virtual void get_result(error_or_value &err_or_val) = 0;
-
- /**
- * Swap out child with another one
- */
- virtual error_or<own<conveyor_node>>
- swap_child(own<conveyor_node> &&swapee_) = 0;
-
- /**
- * Retrieve the next storage node
- */
- virtual conveyor_storage *next_storage() = 0;
-
- /**
- * Notify that a new parent was attached
- * Only relevant for the feeding nodes
- */
- virtual void notify_parent_attached(conveyor_node &){};
-};
-
-class conveyor_node_with_child_mixin final {
-public:
- own<conveyor_node> child = nullptr;
-
- conveyor_node_with_child_mixin(own<conveyor_node> &&child_,
- conveyor_node &owner_);
- ~conveyor_node_with_child_mixin() = default;
-
- /**
- * Swap out children and return the child ptr, since the caller is the child
- * itself. Stack needs to be cleared before the child is destroyed, so the
- * swapped out node is returned as well.
- */
- error_or<own<conveyor_node>> swap_child(own<conveyor_node> &&swapee);
-};
-
-class conveyor_node_with_parent_mixin final {
-public:
- conveyor_node *parent = nullptr;
-
- error_or<own<conveyor_node>>
- swap_child_of_parent(own<conveyor_node> &&swapee) {
- SAW_ASSERT(parent) {
- return make_error<err::invalid_state>(
- "Can't swap child, because parent doesn't exist");
- }
-
- return parent->swap_child(std::move(swapee));
- }
- void change_parent(conveyor_node *p) { parent = p; }
-};
-
-class event_loop;
-class wait_scope;
-/*
- * Event class similar to capn'proto.
- * https://github.com/capnproto/capnproto
- */
-class event {
-private:
- event_loop &loop_;
- event **prev_ = nullptr;
- event *next_ = nullptr;
-
- friend class event_loop;
-
-public:
- event();
- event(event_loop &loop);
- virtual ~event();
-
- virtual void fire() = 0;
-
- void arm_next();
- void arm_later();
- void arm_last();
- void disarm();
-
- bool is_armed() const;
-};
-
-class conveyor_storage {
-protected:
- conveyor_storage *parent_ = nullptr;
-
-public:
- conveyor_storage();
- virtual ~conveyor_storage();
-
- virtual size_t space() const = 0;
- virtual size_t queued() const = 0;
- virtual void child_has_fired() = 0;
- virtual void parent_has_fired() = 0;
-
- virtual void set_parent(conveyor_storage *parent) = 0;
- conveyor_storage *get_parent() const;
-};
-
-class conveyor_event_storage : public conveyor_storage, public event {
-public:
- conveyor_event_storage();
- virtual ~conveyor_event_storage() = default;
-
- void set_parent(conveyor_storage *parent) override;
-};
-
-class conveyor_base {
-protected:
- own<conveyor_node> node_;
-
-public:
- conveyor_base(own<conveyor_node> &&node_p);
- virtual ~conveyor_base() = default;
-
- conveyor_base(conveyor_base &&) = default;
- conveyor_base &operator=(conveyor_base &&) = default;
-
- void get(error_or_value &err_or_val);
-};
-
-template <typename T> class conveyor;
-
-template <typename T> conveyor<T> chained_conveyor_type(T *);
-
-// template <typename T> Conveyor<T> chainedConveyorType(Conveyor<T> *);
-
-template <typename T> T remove_error_or_type(T *);
-
-template <typename T> T remove_error_or_type(error_or<T> *);
-
-template <typename T>
-using remove_error_or = decltype(remove_error_or_type((T *)nullptr));
-
-template <typename T>
-using chained_conveyors = decltype(chained_conveyor_type((T *)nullptr));
-
-template <typename Func, typename T>
-using conveyor_result =
- chained_conveyors<remove_error_or<return_type<Func, T>>>;
-
-struct propagate_error {
-public:
- error operator()(const error &err) const;
- error operator()(error &&err);
-};
-
-class conveyor_sink {
-private:
- own<conveyor_node> node_;
-
-public:
- conveyor_sink();
- conveyor_sink(own<conveyor_node> &&node);
-
- conveyor_sink(conveyor_sink &&) = default;
- conveyor_sink &operator=(conveyor_sink &&) = default;
-};
-
-template <typename T> class merge_conveyor_node_data;
-
-template <typename T> class merge_conveyor {
-private:
- lent<merge_conveyor_node_data<T>> data_;
-
-public:
- merge_conveyor() = default;
- merge_conveyor(lent<merge_conveyor_node_data<T>> d);
- ~merge_conveyor();
-
- void attach(conveyor<T> conv);
-};
-
-/**
- * Main interface for async operations.
- */
-template <typename T> class conveyor final : public conveyor_base {
-public:
- /**
- * Construct an immediately fulfilled node
- */
- conveyor(fix_void<T> value);
-
- /**
- * Construct an immediately failed node
- */
- conveyor(error &&err);
-
- /**
- * Construct a conveyor with a child node
- */
- conveyor(own<conveyor_node> node_p);
-
- conveyor(conveyor<T> &&) = default;
- conveyor<T> &operator=(conveyor<T> &&) = default;
-
- /**
- * This method converts values or errors from children
- */
- template <typename Func, typename ErrorFunc = propagate_error>
- [[nodiscard]] conveyor_result<Func, T>
- then(Func &&func, ErrorFunc &&error_func = propagate_error());
-
- /**
- * This method adds a buffer node in the conveyor chains which acts as a
- * scheduler interrupt point and collects elements up to the supplied limit.
- */
- [[nodiscard]] conveyor<T>
- buffer(size_t limit = std::numeric_limits<size_t>::max());
-
- /**
- * This method just takes ownership of any supplied types,
- * which are destroyed when the chain gets destroyed.
- * Useful for resource lifetime control.
- */
- template <typename... Args>
- [[nodiscard]] conveyor<T> attach(Args &&...args);
-
- /** @todo implement
- * This method limits the total amount of passed elements
- * Be careful where you place this node into the chain.
- * If you meant to fork it and destroy paths you shouldn't place
- * an interrupt point between the fork and this limiter
- */
- [[nodiscard]] conveyor<T> limit(size_t val = 1);
-
- /**
- *
- */
- [[nodiscard]] std::pair<conveyor<T>, merge_conveyor<T>> merge();
-
- /**
- * Moves the conveyor chain into a thread local storage point which drops
- * every element. Use sink() if you want to control the lifetime of a
- * conveyor chain
- */
- template <typename ErrorFunc = propagate_error>
- void detach(ErrorFunc &&err_func = propagate_error());
- /**
- * Creates a local sink which drops elements, but lifetime control remains
- * in your hand.
- */
- template <typename ErrorFunc = propagate_error>
- [[nodiscard]] conveyor_sink
- sink(ErrorFunc &&error_func = propagate_error());
-
- /**
- * If no sink() or detach() is used you have to take elements out of the
- * chain yourself.
- */
- error_or<fix_void<T>> take();
-
- /** @todo implement
- * Specifically pump elements through this chain with the provided
- * wait_scope
- */
- void poll(wait_scope &wait_scope);
-
- // helper
- static conveyor<T> to_conveyor(own<conveyor_node> node);
-
- // helper
- static own<conveyor_node> from_conveyor(conveyor<T> conveyor);
-};
-
-template <typename Func> conveyor_result<Func, void> exec_later(Func &&func);
-
-/*
- * Join Conveyors into a single one
- */
-template <typename... Args>
-conveyor<std::tuple<Args...>>
-join_conveyors(std::tuple<conveyor<Args>...> &conveyors);
-
-template <typename T> class conveyor_feeder {
-public:
- virtual ~conveyor_feeder() = default;
-
- virtual void feed(T &&data) = 0;
- virtual void fail(error &&error) = 0;
-
- virtual size_t space() const = 0;
- virtual size_t queued() const = 0;
-
- virtual error swap(conveyor<T> &&conveyor) noexcept = 0;
-};
-
-template <> class conveyor_feeder<void> {
-public:
- virtual ~conveyor_feeder() = default;
-
- virtual void feed(void_t &&value = void_t{}) = 0;
- virtual void fail(error &&error) = 0;
-
- virtual size_t space() const = 0;
- virtual size_t queued() const = 0;
-
- virtual error swap(conveyor<void_t> &&conveyor) noexcept = 0;
-};
-
-template <typename T> struct conveyor_and_feeder {
- own<conveyor_feeder<T>> feeder;
- class conveyor<T> conveyor;
-};
-
-template <typename T> conveyor_and_feeder<T> new_conveyor_and_feeder();
-
-template <typename T> conveyor_and_feeder<T> one_time_conveyor_and_feeder();
-
-enum class Signal : uint8_t { Terminate, User1 };
-
-/**
- * Class which acts as a correspondent between the running framework and outside
- * events which may be signals from the operating system or just other threads.
- * Default EventPorts are supplied by setupAsyncIo() in io.h
- */
-class event_port {
-public:
- virtual ~event_port() = default;
-
- virtual conveyor<void> on_signal(Signal signal) = 0;
-
- virtual void poll() = 0;
- virtual void wait() = 0;
- virtual void wait(const std::chrono::steady_clock::duration &) = 0;
- virtual void wait(const std::chrono::steady_clock::time_point &) = 0;
-
- virtual void wake() = 0;
-};
-
-class sink_conveyor_node;
-
-class conveyor_sink_set final : public event {
-private:
- /*
- class Helper final : public Event {
- private:
- void destroySinkConveyorNode(ConveyorNode& sink);
- void fail(Error&& error);
-
- std::vector<Own<ConveyorNode>> sink_nodes;
- std::queue<ConveyorNode*> delete_nodes;
- std::function<void(Error&& error)> error_handler;
-
- public:
- ConveyorSinks() = default;
- ConveyorSinks(EventLoop& event_loop);
-
- void add(Conveyor<void> node);
-
- void fire() override {}
- };
-
- gin::Own<Helper> helper;
- */
- friend class sink_conveyor_node;
-
- void destroy_sink_conveyor_node(conveyor_node &sink_node);
- void fail(error &&err);
-
- std::list<own<conveyor_node>> sink_nodes_;
-
- std::queue<conveyor_node *> delete_nodes_;
-
- std::function<void(error &&)> error_handler_;
-
-public:
- // ConveyorSinks();
- // ConveyorSinks(EventLoop& event_loop);
- conveyor_sink_set() = default;
- conveyor_sink_set(event_loop &event_loop);
-
- void add(conveyor<void> &&node);
-
- void fire() override;
-};
-
-/*
- * EventLoop class similar to capn'proto.
- * https://github.com/capnproto/capnproto
- */
-class event_loop {
-private:
- friend class event;
- event *head_ = nullptr;
- event **tail_ = &head_;
- event **next_insert_point_ = &head_;
- event **later_insert_point_ = &head_;
-
- bool is_runnable_ = false;
-
- own<event_port> event_port_ = nullptr;
-
- own<conveyor_sink_set> daemon_sink_ = nullptr;
-
- // functions
- void set_runnable(bool runnable);
-
- friend class wait_scope;
- void enter_scope();
- void leave_scope();
-
- bool turn_loop();
- bool turn();
-
-public:
- event_loop();
- event_loop(own<event_port> &&port);
- ~event_loop();
-
- event_loop(event_loop &&) = default;
- event_loop &operator=(event_loop &&) = default;
-
- bool wait();
- bool wait(const std::chrono::steady_clock::duration &);
- bool wait(const std::chrono::steady_clock::time_point &);
- bool poll();
-
- event_port *get_event_port();
-
- conveyor_sink_set &daemon();
-};
-
-/*
- * WaitScope class similar to capn'proto.
- * https://github.com/capnproto/capnproto
- */
-class wait_scope {
-private:
- event_loop &loop_;
-
-public:
- wait_scope(event_loop &loop);
- ~wait_scope();
-
- void wait();
- void wait(const std::chrono::steady_clock::duration &);
- void wait(const std::chrono::steady_clock::time_point &);
- void poll();
-};
-
-template <typename Func> conveyor_result<Func, void> yield_next(Func &&func);
-
-template <typename Func> conveyor_result<Func, void> yield_later(Func &&func);
-
-template <typename Func> conveyor_result<Func, void> yield_last(Func &&func);
-} // namespace saw
-
-// Secret stuff
-// Aka private semi hidden classes
-namespace saw {
-
-template <typename Out, typename In> struct fix_void_caller {
- template <typename Func> static Out apply(Func &func, In &&in) {
- return func(std::move(in));
- }
-};
-
-template <typename Out> struct fix_void_caller<Out, void_t> {
- template <typename Func> static Out apply(Func &func, void_t &&in) {
- (void)in;
- return func();
- }
-};
-
-template <typename In> struct fix_void_caller<void_t, In> {
- template <typename Func> static void_t apply(Func &func, In &&in) {
- func(std::move(in));
- return void_t{};
- }
-};
-
-template <> struct fix_void_caller<void_t, void_t> {
- template <typename Func> static void_t apply(Func &func, void_t &&in) {
- (void)in;
- func();
- return void_t{};
- }
-};
-
-template <typename T> class adapt_conveyor_node;
-
-template <typename T>
-class adapt_conveyor_feeder final : public conveyor_feeder<unfix_void<T>> {
-private:
- adapt_conveyor_node<T> *feedee_ = nullptr;
-
-public:
- ~adapt_conveyor_feeder();
-
- void set_feedee(adapt_conveyor_node<T> *feedee);
-
- void feed(T &&value) override;
- void fail(error &&error) override;
-
- size_t space() const override;
- size_t queued() const override;
-
- error swap(conveyor<T> &&conv) noexcept override;
-};
-
-template <typename T>
-class adapt_conveyor_node final : public conveyor_node,
- public conveyor_event_storage {
-private:
- adapt_conveyor_feeder<T> *feeder_ = nullptr;
-
- std::queue<error_or<unfix_void<T>>> storage_;
-
- conveyor_node_with_parent_mixin parent_node_;
-
-public:
- adapt_conveyor_node();
- ~adapt_conveyor_node();
-
- void set_feeder(adapt_conveyor_feeder<T> *feeder);
-
- void feed(T &&value);
- void fail(error &&error);
-
- // ConveyorNode
- void get_result(error_or_value &err_or_val) override;
-
- error_or<own<conveyor_node>>
- swap_child(own<conveyor_node> &&swapee) noexcept override;
-
- conveyor_storage *next_storage() noexcept override;
- void notify_parent_attached(conveyor_node &) noexcept override;
-
- // ConveyorStorage
- size_t space() const override;
- size_t queued() const override;
-
- void child_has_fired() override;
- void parent_has_fired() override;
-
- // Event
- void fire() override;
-};
-
-template <typename T> class one_time_conveyor_node;
-
-template <typename T>
-class one_time_conveyor_feeder final : public conveyor_feeder<unfix_void<T>> {
-private:
- one_time_conveyor_node<T> *feedee_ = nullptr;
-
-public:
- ~one_time_conveyor_feeder();
-
- void set_feedee(one_time_conveyor_node<T> *feedee);
-
- void feed(T &&value) override;
- void fail(error &&error) override;
-
- size_t space() const override;
- size_t queued() const override;
-};
-
-template <typename T>
-class one_time_conveyor_node final : public conveyor_node,
- public conveyor_storage,
- public event {
-private:
- one_time_conveyor_feeder<T> *feeder_ = nullptr;
-
- bool passed_ = false;
- maybe<error_or<T>> storage_ = std::nullopt;
-
-public:
- ~one_time_conveyor_node();
-
- void set_feeder(one_time_conveyor_feeder<T> *feeder);
-
- void feed(T &&value);
- void fail(error &&error);
-
- // ConveyorNode
- void get_result(error_or_value &err_or_val) override;
-
- error_or<own<conveyor_node>>
- swap_child(own<conveyor_node> &&swapee) override;
-
- // ConveyorStorage
- size_t space() const override;
- size_t queued() const override;
-
- void child_has_fired() override {}
- void parent_has_fired() override;
-
- // Event
- void fire() override;
-};
-
-/**
- * This class buffers and saves incoming elements and acts as an interrupt node
- * for processing calls
- */
-class queue_buffer_conveyor_node_base : public conveyor_node,
- public conveyor_event_storage {
-protected:
- conveyor_node_with_child_mixin child_mixin_;
-
-public:
- queue_buffer_conveyor_node_base(own<conveyor_node> child_)
- : conveyor_event_storage{}, child_mixin_{std::move(child_), *this} {}
- virtual ~queue_buffer_conveyor_node_base() = default;
-
- /**
- * Use mixin
- */
- error_or<own<conveyor_node>>
- swap_child(own<conveyor_node> &&swapee_) noexcept override;
-
- conveyor_storage *next_storage() noexcept override {
- return static_cast<conveyor_storage *>(this);
- }
-};
-
-template <typename T>
-class queue_buffer_conveyor_node final
- : public queue_buffer_conveyor_node_base {
-private:
- std::queue<error_or<T>> storage_;
- size_t max_store_;
-
-public:
- queue_buffer_conveyor_node(own<conveyor_node> dep, size_t max_size)
- : queue_buffer_conveyor_node_base{std::move(dep)}, max_store_{
- max_size} {}
- // Event
- void fire() override;
- // ConveyorNode
- void get_result(error_or_value &eov) noexcept override;
-
- // ConveyorStorage
- size_t space() const override;
- size_t queued() const override;
-
- void child_has_fired() override;
- void parent_has_fired() override;
-};
-
-class attach_conveyor_node_base : public conveyor_node {
-protected:
- conveyor_node_with_child_mixin child_mixin_;
-
-public:
- attach_conveyor_node_base(own<conveyor_node> &&child_)
- : child_mixin_{std::move(child_), *this} {}
-
- virtual ~attach_conveyor_node_base() = default;
-
- void get_result(error_or_value &err_or_val) noexcept override;
-
- /**
- * Use mixin
- */
- error_or<own<conveyor_node>>
- swap_child(own<conveyor_node> &&swapee_) noexcept override;
-
- conveyor_storage *next_storage() noexcept override;
-};
-
-template <typename... Args>
-class attach_conveyor_node final : public attach_conveyor_node_base {
-public:
- attach_conveyor_node(own<conveyor_node> &&dep, Args &&...args)
- : attach_conveyor_node_base(std::move(dep)), attached_data_{
- std::move(args...)} {}
-
-private:
- std::tuple<Args...> attached_data_;
-};
-
-class convert_conveyor_node_base : public conveyor_node {
-public:
- convert_conveyor_node_base(own<conveyor_node> &&dep);
- virtual ~convert_conveyor_node_base() = default;
-
- void get_result(error_or_value &err_or_val) override;
-
- virtual void get_impl(error_or_value &err_or_val) = 0;
-
- error_or<own<conveyor_node>>
- swap_child(own<conveyor_node> &&swapee) noexcept override;
-
- conveyor_storage *next_storage() noexcept override;
-
-protected:
- conveyor_node_with_child_mixin child_mixin_;
-};
-
-template <typename T, typename DepT, typename Func, typename ErrorFunc>
-class convert_conveyor_node final : public convert_conveyor_node_base {
-private:
- Func func_;
- ErrorFunc error_func_;
-
- static_assert(std::is_same<DepT, remove_error_or<DepT>>::value,
- "Should never be of type ErrorOr");
-
-public:
- convert_conveyor_node(own<conveyor_node> &&dep, Func &&func,
- ErrorFunc &&error_func)
- : convert_conveyor_node_base(std::move(dep)), func_{std::move(func)},
- error_func_{std::move(error_func)} {}
-
- void get_impl(error_or_value &err_or_val) noexcept override {
- error_or<unfix_void<DepT>> dep_eov;
- error_or<unfix_void<remove_error_or<T>>> &eov =
- err_or_val.as<unfix_void<remove_error_or<T>>>();
- if (child_mixin_.child) {
- child_mixin_.child->get_result(dep_eov);
- if (dep_eov.is_value()) {
- try {
-
- eov = fix_void_caller<T, DepT>::apply(
- func_, std::move(dep_eov.get_value()));
- } catch (const std::bad_alloc &) {
- eov = make_error<err::out_of_memory>("Out of memory");
- } catch (const std::exception &) {
- eov = make_error<err::invalid_state>(
- "Exception in chain occured. Return ErrorOr<T> if you "
- "want to handle errors which are recoverable");
- }
- } else if (dep_eov.is_error()) {
- eov = error_func_(std::move(dep_eov.get_error()));
- } else {
- eov = make_error<err::invalid_state>("No value set in dependency");
- }
- } else {
- eov = make_error<err::invalid_state>("Conveyor doesn't have child");
- }
- }
-};
-
-class sink_conveyor_node final : public conveyor_node,
- public conveyor_event_storage {
-private:
- conveyor_node_with_child_mixin child_mixin_;
- conveyor_sink_set *conveyor_sink_;
-
-public:
- sink_conveyor_node(own<conveyor_node> node, conveyor_sink_set &conv_sink)
- : conveyor_event_storage{}, child_mixin_{std::move(node), *this},
- conveyor_sink_{&conv_sink} {}
-
- sink_conveyor_node(own<conveyor_node> node)
- : conveyor_event_storage{}, child_mixin_{std::move(node), *this},
- conveyor_sink_{nullptr} {}
-
- // Event only queued if a critical error occured
- void fire() override {
- // Queued for destruction of children, because this acts as a sink and
- // no other event should be here
- child_mixin_.child = nullptr;
-
- if (conveyor_sink_) {
- conveyor_sink_->destroy_sink_conveyor_node(*this);
- conveyor_sink_ = nullptr;
- }
- }
-
- // ConveyorStorage
- size_t space() const override { return 1; }
- size_t queued() const override { return 0; }
-
- // ConveyorNode
- void get_result(error_or_value &err_or_val) noexcept override {
- err_or_val.as<void_t>() =
- make_error<err::invalid_state>("In a sink node no result can be returned");
- }
-
- error_or<own<conveyor_node>>
- swap_child(own<conveyor_node> &&swapee) noexcept override {
- return child_mixin_.swap_child(std::move(swapee));
- }
-
- // ConveyorStorage
- void child_has_fired() override {
- if (child_mixin_.child) {
- error_or<void> dep_eov;
- child_mixin_.child->get_result(dep_eov);
- if (dep_eov.is_error()) {
- if (dep_eov.get_error().is_critical()) {
- if (!is_armed()) {
- arm_last();
- }
- }
- if (conveyor_sink_) {
- conveyor_sink_->fail(std::move(dep_eov.get_error()));
- }
- }
- }
- }
-
- /*
- * No parent needs to be fired since we always have space
- */
- void parent_has_fired() override {}
-
- conveyor_storage *next_storage() override {
- // Should never happen though
- assert(false);
- return nullptr;
- // return static_cast<ConveyorStorage*>(this);
- }
-};
-
-class immediate_conveyor_node_base : public conveyor_node,
- public conveyor_event_storage {
-private:
-public:
- immediate_conveyor_node_base();
-
- virtual ~immediate_conveyor_node_base() = default;
-
- error_or<own<conveyor_node>>
- swap_child(own<conveyor_node> &&swapee) noexcept override {
- (void)swapee;
- return make_error<err::not_supported>("Node doesn't support swapping");
- }
-
- conveyor_storage *next_storage() noexcept override {
- return static_cast<conveyor_storage *>(this);
- }
-};
-
-template <typename T>
-class immediate_conveyor_node final : public immediate_conveyor_node_base {
-private:
- error_or<fix_void<T>> value_;
- uint8_t retrieved_;
-
-public:
- immediate_conveyor_node(fix_void<T> &&val);
- immediate_conveyor_node(error &&error);
-
- // ConveyorStorage
- size_t space() const override;
- size_t queued() const override;
-
- void child_has_fired() override;
- void parent_has_fired() override;
-
- // ConveyorNode
- void get_result(error_or_value &err_or_val) noexcept override {
- if (retrieved_ > 0) {
- err_or_val.as<fix_void<T>>() =
- make_error<err::buffer_exhausted>("Already taken value");
- } else {
- err_or_val.as<fix_void<T>>() = std::move(value_);
- }
- if (queued() > 0) {
- ++retrieved_;
- }
- }
-
- // Event
- void fire() override;
-};
-
-/*
- * Collects every incoming value and throws it in one lane
- */
-class merge_conveyor_node_base : public conveyor_node,
- public conveyor_event_storage {
-public:
- merge_conveyor_node_base();
-
- virtual ~merge_conveyor_node_base() = default;
-
- conveyor_storage *next_storage() noexcept override {
- return static_cast<conveyor_storage *>(this);
- }
-};
-
-template <typename T>
-class merge_conveyor_node : public merge_conveyor_node_base {
-private:
- class appendage final : public conveyor_node, public conveyor_storage {
- public:
- own<conveyor_node> child;
- merge_conveyor_node *merger;
-
- maybe<error_or<fix_void<T>>> error_or_value_;
-
- public:
- appendage(own<conveyor_node> n, merge_conveyor_node &m)
- : conveyor_storage{}, child{std::move(n)}, merger{&m},
- error_or_value_{std::nullopt} {}
-
- bool child_storage_has_element_queued() const {
- if (!child) {
- return false;
- }
- conveyor_storage *storage = child->next_storage();
- if (storage) {
- return storage->queued() > 0;
- }
- return false;
- }
-
- void get_appendage_result(error_or_value &eov);
-
- /**
- * ConveyorNode
- */
- error_or<own<conveyor_node>>
- swap_child(own<conveyor_node> &&swapee_) override;
-
- conveyor_storage *next_storage() noexcept override {
- return static_cast<conveyor_storage *>(this);
- }
-
- void get_result(error_or_value &err_or_val) override;
-
- /**
- * ConveyorStorage
- */
- size_t space() const override;
-
- size_t queued() const override;
-
- void child_has_fired() override;
-
- void parent_has_fired() override;
-
- void set_parent(conveyor_storage *par) override;
- };
-
- friend class merge_conveyor_node_data<T>;
- friend class appendage;
-
- our<merge_conveyor_node_data<T>> data_;
- size_t next_appendage_ = 0;
-
-public:
- merge_conveyor_node(our<merge_conveyor_node_data<T>> data);
- ~merge_conveyor_node();
- // ConveyorNode
- error_or<own<conveyor_node>>
- swap_child(own<conveyor_node> &&c) noexcept override;
-
- // Event
- void get_result(error_or_value &err_or_val) noexcept override;
-
- void fire() override;
-
- // ConveyorStorage
- size_t space() const override;
- size_t queued() const override;
- void child_has_fired() override;
- void parent_has_fired() override;
-};
-
-template <typename T> class merge_conveyor_node_data {
-public:
- std::vector<own<typename merge_conveyor_node<T>::appendage>> appendages;
-
- merge_conveyor_node<T> *merger = nullptr;
-
-public:
- void attach(conveyor<T> conv);
-
- void governing_node_destroyed();
-};
-
-/*
-class JoinConveyorNodeBase : public ConveyorNode, public ConveyorEventStorage {
-private:
-
-public:
-};
-
-template <typename... Args>
-class JoinConveyorNode final : public JoinConveyorNodeBase {
-private:
- template<typename T>
- class Appendage : public ConveyorEventStorage {
- private:
- Maybe<T> data = std::nullopt;
-
- public:
- size_t space() const override;
- size_t queued() const override;
-
- void fire() override;
- void get_result(ErrorOrValue& eov) override;
- };
-
- std::tuple<Appendage<Args>...> appendages;
-
-public:
-};
-
-*/
-
-} // namespace saw
-
-#include "async.tmpl.h"
diff --git a/c++/async/async.tmpl.h b/c++/async/async.tmpl.h
deleted file mode 100644
index 9569f60..0000000
--- a/c++/async/async.tmpl.h
+++ /dev/null
@@ -1,767 +0,0 @@
-#pragma once
-
-#include <forstio/core/common.h>
-#include <forstio/core/error.h>
-
-#include <cassert>
-// Template inlining
-
-namespace saw {
-
-template <typename Func> conveyor_result<Func, void> execLater(Func &&func) {
- conveyor<void> conveyor{fix_void<void>{}};
- return conveyor.then(std::move(func));
-}
-
-template <typename T>
-conveyor<T>::conveyor(fix_void<T> value) : conveyor_base(nullptr) {
- // Is there any way to do this?
- // @todo new conveyor_base constructor for Immediate values
-
- own<immediate_conveyor_node<fix_void<T>>> immediate =
- heap<immediate_conveyor_node<fix_void<T>>>(std::move(value));
-
- if (!immediate) {
- return;
- }
-
- node_ = std::move(immediate);
-}
-
-template <typename T>
-conveyor<T>::conveyor(error &&err) : conveyor_base(nullptr) {
- own<immediate_conveyor_node<fix_void<T>>> immediate =
- heap<immediate_conveyor_node<fix_void<T>>>(std::move(err));
-
- if (!immediate) {
- return;
- }
-
- node_ = std::move(immediate);
-}
-
-template <typename T>
-conveyor<T>::conveyor(own<conveyor_node> node_p)
- : conveyor_base{std::move(node_p)} {}
-
-template <typename T>
-template <typename Func, typename ErrorFunc>
-conveyor_result<Func, T> conveyor<T>::then(Func &&func,
- ErrorFunc &&error_func) {
- own<conveyor_node> conversion_node =
- heap<convert_conveyor_node<fix_void<return_type<Func, T>>, fix_void<T>,
- Func, ErrorFunc>>(
- std::move(node_), std::move(func), std::move(error_func));
-
- return conveyor<remove_error_or<return_type<Func, T>>>::to_conveyor(
- std::move(conversion_node));
-}
-
-template <typename T> conveyor<T> conveyor<T>::buffer(size_t size) {
- SAW_ASSERT(node_) { return conveyor<T>{own<conveyor_node>{nullptr}}; }
- conveyor_storage *storage = node_->next_storage();
- SAW_ASSERT(storage) { return conveyor<T>{own<conveyor_node>{nullptr}}; }
-
- own<queue_buffer_conveyor_node<fix_void<T>>> storage_node =
- heap<queue_buffer_conveyor_node<fix_void<T>>>(std::move(node_), size);
-
- conveyor_storage *storage_ptr =
- static_cast<conveyor_storage *>(storage_node.get());
-
- storage->set_parent(storage_ptr);
- return conveyor<T>{std::move(storage_node)};
-}
-
-template <typename T>
-template <typename... Args>
-conveyor<T> conveyor<T>::attach(Args &&...args) {
- own<attach_conveyor_node<Args...>> attach_node =
- heap<attach_conveyor_node<Args...>>(std::move(node_),
- std::move(args...));
- return conveyor<T>{std::move(attach_node)};
-}
-
-template <typename T>
-std::pair<conveyor<T>, merge_conveyor<T>> conveyor<T>::merge() {
- our<merge_conveyor_node_data<T>> data =
- share<merge_conveyor_node_data<T>>();
-
- own<merge_conveyor_node<T>> merge_node = heap<merge_conveyor_node<T>>(data);
-
- SAW_ASSERT(node_) {
- return std::make_pair(conveyor<T>{own<conveyor_node>{nullptr}},
- merge_conveyor<T>{});
- }
- conveyor_storage *storage = node_->next_storage();
- SAW_ASSERT(storage) {
- return std::make_pair(conveyor<T>{own<conveyor_node>{nullptr}},
- merge_conveyor<T>{});
- }
-
- data->attach(conveyor<T>::to_conveyor(std::move(node_)));
-
- merge_conveyor<T> node_ref{data};
-
- return std::make_pair(conveyor<T>{std::move(merge_node)},
- std::move(node_ref));
-}
-
-template <>
-template <typename ErrorFunc>
-conveyor_sink conveyor<void>::sink(ErrorFunc &&error_func) {
- conveyor_storage *storage = node_->next_storage();
- SAW_ASSERT(storage) { return conveyor_sink{}; }
-
- own<sink_conveyor_node> sink_node =
- heap<sink_conveyor_node>(std::move(node_));
- conveyor_storage *storage_ptr =
- static_cast<conveyor_storage *>(sink_node.get());
-
- storage->set_parent(storage_ptr);
-
- return conveyor_sink{std::move(sink_node)};
-}
-
-void detach_conveyor(conveyor<void> &&conveyor);
-
-template <typename T>
-template <typename ErrorFunc>
-void conveyor<T>::detach(ErrorFunc &&func) {
- detach_conveyor(std::move(then([](T &&) {}, std::move(func))));
-}
-
-template <>
-template <typename ErrorFunc>
-void conveyor<void>::detach(ErrorFunc &&func) {
- detach_conveyor(std::move(then([]() {}, std::move(func))));
-}
-
-template <typename T>
-conveyor<T> conveyor<T>::to_conveyor(own<conveyor_node> node) {
- return conveyor<T>{std::move(node)};
-}
-
-template <typename T>
-own<conveyor_node> conveyor<T>::from_conveyor(conveyor<T> conveyor) {
- return std::move(conveyor.node_);
-}
-
-template <typename T> error_or<fix_void<T>> conveyor<T>::take() {
- SAW_ASSERT(node_) {
- return error_or<fix_void<T>>{
- make_error<err::invalid_state>("conveyor in invalid state")};
- }
- conveyor_storage *storage = node_->next_storage();
- if (storage) {
- if (storage->queued() > 0) {
- error_or<fix_void<T>> result;
- node_->get_result(result);
- return result;
- } else {
- return error_or<fix_void<T>>{
- make_error<err::buffer_exhausted>("conveyor buffer has no elements")};
- }
- } else {
- return error_or<fix_void<T>>{
- make_error<err::invalid_state>("conveyor node has no child storage")};
- }
-}
-
-template <typename T> conveyor_and_feeder<T> new_conveyor_and_feeder() {
- own<adapt_conveyor_feeder<fix_void<T>>> feeder =
- heap<adapt_conveyor_feeder<fix_void<T>>>();
- own<adapt_conveyor_node<fix_void<T>>> node =
- heap<adapt_conveyor_node<fix_void<T>>>();
-
- feeder->set_feedee(node.get());
- node->set_feeder(feeder.get());
-
- return conveyor_and_feeder<T>{std::move(feeder),
- conveyor<T>::to_conveyor(std::move(node))};
-}
-
-// QueueBuffer
-template <typename T> void queue_buffer_conveyor_node<T>::fire() {
- if (child_mixin_.child) {
- if (!storage_.empty()) {
- if (storage_.front().is_error()) {
- if (storage_.front().get_error().is_critical()) {
- child_mixin_.child = nullptr;
- }
- }
- }
- }
-
- bool has_space_before_fire = space() > 0;
-
- if (parent_) {
- parent_->child_has_fired();
- if (!storage_.empty() && parent_->space() > 0) {
- arm_later();
- }
- }
-
- if (!child_mixin_.child) {
- while (!storage_.empty()) {
- storage_.pop();
- }
- return;
- }
-
- conveyor_storage *ch_storage = child_mixin_.child->next_storage();
- if (ch_storage && !has_space_before_fire) {
- ch_storage->parent_has_fired();
- }
-}
-
-template <typename T>
-void queue_buffer_conveyor_node<T>::get_result(error_or_value &eov) noexcept {
- error_or<T> &err_or_val = eov.as<T>();
- err_or_val = std::move(storage_.front());
- storage_.pop();
-}
-
-template <typename T> size_t queue_buffer_conveyor_node<T>::space() const {
- return max_store_ - storage_.size();
-}
-
-template <typename T> size_t queue_buffer_conveyor_node<T>::queued() const {
- return storage_.size();
-}
-
-template <typename T> void queue_buffer_conveyor_node<T>::child_has_fired() {
- if (child_mixin_.child && storage_.size() < max_store_) {
- error_or<T> eov;
- child_mixin_.child->get_result(eov);
-
- if (eov.is_error()) {
- if (eov.get_error().is_critical()) {
- }
- }
-
- storage_.push(std::move(eov));
- if (!is_armed()) {
- arm_later();
- }
- }
-}
-
-template <typename T> void queue_buffer_conveyor_node<T>::parent_has_fired() {
- SAW_ASSERT(parent_) { return; }
-
- if (parent_->space() == 0) {
- return;
- }
-
- if (queued() > 0) {
- arm_later();
- }
-}
-
-template <typename T>
-immediate_conveyor_node<T>::immediate_conveyor_node(fix_void<T> &&val)
- : value_{std::move(val)}, retrieved_{0} {}
-
-template <typename T>
-immediate_conveyor_node<T>::immediate_conveyor_node(error &&error)
- : value_{std::move(error)}, retrieved_{0} {}
-
-template <typename T> size_t immediate_conveyor_node<T>::space() const {
- return 0;
-}
-
-template <typename T> size_t immediate_conveyor_node<T>::queued() const {
- return retrieved_ > 1 ? 0 : 1;
-}
-
-template <typename T> void immediate_conveyor_node<T>::child_has_fired() {
- // Impossible case
- assert(false);
-}
-
-template <typename T> void immediate_conveyor_node<T>::parent_has_fired() {
- SAW_ASSERT(parent_) { return; }
- assert(parent_->space() > 0);
-
- if (queued() > 0) {
- arm_next();
- }
-}
-
-template <typename T> void immediate_conveyor_node<T>::fire() {
-
- if (parent_) {
- parent_->child_has_fired();
- if (queued() > 0 && parent_->space() > 0) {
- arm_last();
- }
- }
-}
-
-template <typename T>
-merge_conveyor<T>::merge_conveyor(lent<merge_conveyor_node_data<T>> d)
- : data_{std::move(d)} {}
-
-template <typename T> merge_conveyor<T>::~merge_conveyor() {}
-
-template <typename T> void merge_conveyor<T>::attach(conveyor<T> conveyor) {
- auto sp = data_.lock();
- SAW_ASSERT(sp) { return; }
-
- sp->attach(std::move(conveyor));
-}
-
-template <typename T>
-merge_conveyor_node<T>::merge_conveyor_node(our<merge_conveyor_node_data<T>> d)
- : data_{d} {
- SAW_ASSERT(data_) { return; }
-
- data_->merger = this;
-}
-
-template <typename T> merge_conveyor_node<T>::~merge_conveyor_node() {}
-
-template <typename T>
-error_or<own<conveyor_node>>
-merge_conveyor_node<T>::swap_child(own<conveyor_node> &&swapee_) noexcept {
- (void)swapee_;
- return make_error<err::invalid_state>(
- "merge_conveyor_node<T>::appendage should block calls to this class");
-}
-
-template <typename T>
-void merge_conveyor_node<T>::get_result(error_or_value &eov) noexcept {
- error_or<fix_void<T>> &err_or_val = eov.as<fix_void<T>>();
-
- SAW_ASSERT(data_) { return; }
-
- /// @todo search appendages for result
-
- auto &appendages = data_->appendages;
- next_appendage_ = std::min(appendages.size(), next_appendage_);
-
- for (size_t i = next_appendage_; i < appendages.size(); ++i) {
- if (appendages[i]->queued() > 0) {
- err_or_val = std::move(appendages[i]->error_or_value_.value());
- appendages[i]->error_or_value_ = std::nullopt;
- next_appendage_ = i + 1;
- return;
- }
- }
- for (size_t i = 0; i < next_appendage_; ++i) {
- if (appendages[i]->queued() > 0) {
- err_or_val = std::move(appendages[i]->error_or_value_.value());
- appendages[i]->error_or_value_ = std::nullopt;
- next_appendage_ = i + 1;
- return;
- }
- }
-
- err_or_val = make_error<err::invalid_state>("No value in Merge appendages");
-}
-
-template <typename T> void merge_conveyor_node<T>::fire() {
- SAW_ASSERT(queued() > 0) { return; }
-
- if (parent_) {
- parent_->child_has_fired();
-
- if (queued() > 0 && parent_->space() > 0) {
- arm_later();
- }
- }
-}
-
-template <typename T> size_t merge_conveyor_node<T>::space() const { return 0; }
-
-template <typename T> size_t merge_conveyor_node<T>::queued() const {
- SAW_ASSERT(data_) { return 0; }
-
- size_t queue_count = 0;
-
- for (auto &iter : data_->appendages) {
- queue_count += iter->queued();
- }
-
- return queue_count;
-}
-
-template <typename T> void merge_conveyor_node<T>::child_has_fired() {
- /// This can never happen
- assert(false);
-}
-
-template <typename T> void merge_conveyor_node<T>::parent_has_fired() {
- SAW_ASSERT(parent_) { return; }
- if (queued() > 0) {
- if (parent_->space() > 0) {
- arm_later();
- }
- }
-}
-
-/**
- * merge_conveyor_node<T>::Apendage
- */
-
-template <typename T>
-error_or<own<conveyor_node>>
-merge_conveyor_node<T>::appendage::swap_child(own<conveyor_node> &&swapee_) {
- own<conveyor_node> old_child = std::move(child);
-
- child = std::move(swapee_);
-
- // This case should never happen
- SAW_ASSERT(old_child) { return make_error<err::invalid_state>("No child exists"); }
-
- return old_child;
-}
-
-template <typename T>
-void merge_conveyor_node<T>::appendage::get_result(error_or_value &eov) {
- error_or<fix_void<T>> &err_or_val = eov.as<fix_void<T>>();
-
- SAW_ASSERT(queued() > 0) {
- err_or_val =
- make_error<err::invalid_state>("No element queued in Merge appendage Node");
- return;
- }
-
- err_or_val = std::move(error_or_value_.value());
- error_or_value_ = std::nullopt;
-}
-
-template <typename T> size_t merge_conveyor_node<T>::appendage::space() const {
- SAW_ASSERT(merger) { return 0; }
-
- if (error_or_value_.has_value()) {
- return 0;
- }
-
- return 1;
-}
-
-template <typename T> size_t merge_conveyor_node<T>::appendage::queued() const {
- SAW_ASSERT(merger) { return 0; }
-
- if (error_or_value_.has_value()) {
- return 1;
- }
-
- return 0;
-}
-
-/// @todo delete this function. Replaced by the regular get_result
-template <typename T>
-void merge_conveyor_node<T>::appendage::get_appendage_result(
- error_or_value &eov) {
- error_or<fix_void<T>> &err_or_val = eov.as<fix_void<T>>();
-
- SAW_ASSERT(queued() > 0) {
- err_or_val =
- make_error<err::invalid_state>("No element queued in Merge appendage Node");
- return;
- }
-
- err_or_val = std::move(error_or_value_.value());
- error_or_value_ = std::nullopt;
-}
-
-template <typename T>
-void merge_conveyor_node<T>::appendage::child_has_fired() {
- SAW_ASSERT(!error_or_value_.has_value()) { return; }
- error_or<fix_void<T>> eov;
- child->get_result(eov);
-
- error_or_value_ = std::move(eov);
-
- if (!merger->is_armed()) {
- merger->arm_later();
- }
-}
-
-template <typename T>
-void merge_conveyor_node<T>::appendage::parent_has_fired() {
- conveyor_storage *child_storage = child->next_storage();
- if (child_storage) {
- child_storage->parent_has_fired();
- }
-}
-
-template <typename T>
-void merge_conveyor_node<T>::appendage::set_parent(conveyor_storage *par) {
- SAW_ASSERT(merger) { return; }
-
- SAW_ASSERT(child) { return; }
-
- parent_ = par;
-}
-
-template <typename T>
-void merge_conveyor_node_data<T>::attach(conveyor<T> conv) {
- auto nas = conveyor<T>::from_conveyor(std::move(conv));
- SAW_ASSERT(nas) { return; }
- conveyor_storage *storage = nas->next_storage();
- SAW_ASSERT(storage) { return; }
-
- auto merge_node_appendage =
- heap<typename merge_conveyor_node<T>::appendage>(std::move(nas),
- *merger);
- auto merge_node_appendage_ptr = merge_node_appendage.get();
-
- storage->set_parent(merge_node_appendage.get());
-
- SAW_ASSERT(merger) { return; }
-
- conveyor_storage *mrg_storage = merger->next_storage();
- SAW_ASSERT(mrg_storage) { return; }
-
- merge_node_appendage->set_parent(mrg_storage);
-
- appendages.push_back(std::move(merge_node_appendage));
-
- /// @todo return this. necessary? maybe for the weird linking setup
- /// maybe not
- // return merge_node_appendage_ptr;
-}
-
-template <typename T>
-void merge_conveyor_node_data<T>::governing_node_destroyed() {
- appendages.clear();
- merger = nullptr;
-}
-
-template <typename T> adapt_conveyor_feeder<T>::~adapt_conveyor_feeder() {
- if (feedee_) {
- feedee_->set_feeder(nullptr);
- feedee_ = nullptr;
- }
-}
-
-template <typename T>
-void adapt_conveyor_feeder<T>::set_feedee(adapt_conveyor_node<T> *feedee_p) {
- feedee_ = feedee_p;
-}
-
-template <typename T> void adapt_conveyor_feeder<T>::feed(T &&value) {
- if (feedee_) {
- feedee_->feed(std::move(value));
- }
-}
-
-template <typename T> void adapt_conveyor_feeder<T>::fail(error &&error) {
- if (feedee_) {
- feedee_->fail(std::move(error));
- }
-}
-
-template <typename T> size_t adapt_conveyor_feeder<T>::queued() const {
- if (feedee_) {
- return feedee_->queued();
- }
- return 0;
-}
-
-template <typename T> size_t adapt_conveyor_feeder<T>::space() const {
- if (feedee_) {
- return feedee_->space();
- }
- return 0;
-}
-
-template <typename T>
-error adapt_conveyor_feeder<T>::swap(conveyor<T> &&conv) noexcept {
- SAW_ASSERT(feedee_) { return make_error<err::invalid_state>("No feedee connected"); }
-
- auto node = conveyor<T>::from_conveyor(std::move(conv));
-
- feedee_->swap_child(std::move(node));
-
- return no_error();
-}
-
-template <typename T>
-adapt_conveyor_node<T>::adapt_conveyor_node() : conveyor_event_storage{} {}
-
-template <typename T> adapt_conveyor_node<T>::~adapt_conveyor_node() {
- if (feeder_) {
- feeder_->set_feedee(nullptr);
- feeder_ = nullptr;
- }
-}
-
-template <typename T>
-error_or<own<conveyor_node>>
-adapt_conveyor_node<T>::swap_child(own<conveyor_node> &&swapee) noexcept {
- // This should return the owning pointer of this instance
- auto myself_err = parent_node_.swap_child_of_parent(std::move(swapee));
-
- if (myself_err.is_error()) {
- return myself_err;
- }
-
- auto &myself = myself_err.get_value();
-
- assert(myself.get() == this);
-
- return myself_err;
-}
-
-template <typename T>
-conveyor_storage *adapt_conveyor_node<T>::next_storage() noexcept {
- return static_cast<conveyor_storage *>(this);
-}
-
-template <typename T>
-void adapt_conveyor_node<T>::notify_parent_attached(
- conveyor_node &par) noexcept {
- parent_node_.change_parent(&par);
-}
-
-template <typename T>
-void adapt_conveyor_node<T>::set_feeder(adapt_conveyor_feeder<T> *feeder_p) {
- feeder_ = feeder_p;
-}
-
-template <typename T> void adapt_conveyor_node<T>::feed(T &&value) {
- storage_.push(std::move(value));
- arm_next();
-}
-
-template <typename T> void adapt_conveyor_node<T>::fail(error &&error) {
- storage_.push(std::move(error));
- arm_next();
-}
-
-template <typename T> size_t adapt_conveyor_node<T>::queued() const {
- return storage_.size();
-}
-
-template <typename T> size_t adapt_conveyor_node<T>::space() const {
- return std::numeric_limits<size_t>::max() - storage_.size();
-}
-
-template <typename T>
-void adapt_conveyor_node<T>::get_result(error_or_value &err_or_val) {
- if (!storage_.empty()) {
- err_or_val.as<T>() = std::move(storage_.front());
- storage_.pop();
- } else {
- err_or_val.as<T>() = make_error<err::invalid_state>(
- "Signal for retrieval of storage sent even though no "
- "data is present");
- }
-}
-
-template <typename T> void adapt_conveyor_node<T>::child_has_fired() {
- // Adapt node has no children
- assert(false);
-}
-
-template <typename T> void adapt_conveyor_node<T>::parent_has_fired() {
- SAW_ASSERT(parent_) { return; }
-
- if (parent_->space() == 0) {
- return;
- }
-}
-
-template <typename T> void adapt_conveyor_node<T>::fire() {
- if (parent_) {
- parent_->child_has_fired();
-
- if (storage_.size() > 0) {
- arm_later();
- }
- }
-}
-
-template <typename T> one_time_conveyor_feeder<T>::~one_time_conveyor_feeder() {
- if (feedee_) {
- feedee_->set_feeder(nullptr);
- feedee_ = nullptr;
- }
-}
-
-template <typename T>
-void one_time_conveyor_feeder<T>::set_feedee(
- one_time_conveyor_node<T> *feedee_p) {
- feedee_ = feedee_p;
-}
-
-template <typename T> void one_time_conveyor_feeder<T>::feed(T &&value) {
- if (feedee_) {
- feedee_->feed(std::move(value));
- }
-}
-
-template <typename T> void one_time_conveyor_feeder<T>::fail(error &&error) {
- if (feedee_) {
- feedee_->fail(std::move(error));
- }
-}
-
-template <typename T> size_t one_time_conveyor_feeder<T>::queued() const {
- if (feedee_) {
- return feedee_->queued();
- }
- return 0;
-}
-
-template <typename T> size_t one_time_conveyor_feeder<T>::space() const {
- if (feedee_) {
- return feedee_->space();
- }
- return 0;
-}
-
-template <typename T> one_time_conveyor_node<T>::~one_time_conveyor_node() {
- if (feeder_) {
- feeder_->set_feedee(nullptr);
- feeder_ = nullptr;
- }
-}
-
-template <typename T>
-void one_time_conveyor_node<T>::set_feeder(
- one_time_conveyor_feeder<T> *feeder_p) {
- feeder_ = feeder_p;
-}
-
-template <typename T> void one_time_conveyor_node<T>::feed(T &&value) {
- storage_ = std::move(value);
- arm_next();
-}
-
-template <typename T> void one_time_conveyor_node<T>::fail(error &&error) {
- storage_ = std::move(error);
- arm_next();
-}
-
-template <typename T> size_t one_time_conveyor_node<T>::queued() const {
- return storage_.has_value() ? 1 : 0;
-}
-
-template <typename T> size_t one_time_conveyor_node<T>::space() const {
- return passed_ ? 0 : 1;
-}
-
-template <typename T>
-void one_time_conveyor_node<T>::get_result(error_or_value &err_or_val) {
- if (storage_.has_value()) {
- err_or_val.as<T>() = std::move(storage_.value());
- storage_ = std::nullopt;
- } else {
- err_or_val.as<T>() = make_error<err::invalid_state>(
- "Signal for retrieval of storage sent even though no "
- "data is present");
- }
-}
-
-template <typename T> void one_time_conveyor_node<T>::fire() {
- if (parent_) {
- parent_->child_has_fired();
- }
-}
-
-} // namespace saw