diff options
author | Claudius Holeksa <mail@keldu.de> | 2023-05-03 20:34:02 +0200 |
---|---|---|
committer | Claudius Holeksa <mail@keldu.de> | 2023-05-03 20:34:02 +0200 |
commit | 2aa2af0007b7e969845642027c635cd3fd9c8aea (patch) | |
tree | e72a05a3c2bfe58442b160c0c8e98ce1d095f36f /forstio/async | |
parent | 9b81a2585142260f89d47cbe1e592cec9e1f778f (diff) |
Moved dirs and added codec-json dir
Diffstat (limited to 'forstio/async')
-rw-r--r-- | forstio/async/.nix/derivation.nix | 31 | ||||
-rw-r--r-- | forstio/async/SConscript | 38 | ||||
-rw-r--r-- | forstio/async/SConstruct | 66 | ||||
-rw-r--r-- | forstio/async/async.cpp | 419 | ||||
-rw-r--r-- | forstio/async/async.h | 1023 | ||||
-rw-r--r-- | forstio/async/async.tmpl.h | 769 |
6 files changed, 0 insertions, 2346 deletions
diff --git a/forstio/async/.nix/derivation.nix b/forstio/async/.nix/derivation.nix deleted file mode 100644 index 8ceac08..0000000 --- a/forstio/async/.nix/derivation.nix +++ /dev/null @@ -1,31 +0,0 @@ -{ lib -, stdenvNoCC -, scons -, clang -, clang-tools -, version -, forstio -}: - -let - -in stdenvNoCC.mkDerivation { - pname = "forstio-async"; - inherit version; - - src = ./..; - - enableParallelBuilding = true; - - nativeBuildInputs = [ - scons - clang - clang-tools - ]; - - buildInputs = [ - forstio.core - ]; - - outputs = ["out" "dev"]; -} diff --git a/forstio/async/SConscript b/forstio/async/SConscript deleted file mode 100644 index 69f8950..0000000 --- a/forstio/async/SConscript +++ /dev/null @@ -1,38 +0,0 @@ -#!/bin/false - -import os -import os.path -import glob - - -Import('env') - -dir_path = Dir('.').abspath - -# Environment for base library -async_env = env.Clone(); - -async_env.sources = sorted(glob.glob(dir_path + "/*.cpp")) -async_env.headers = sorted(glob.glob(dir_path + "/*.h")) - -env.sources += async_env.sources; -env.headers += async_env.headers; - -## Shared lib -objects_shared = [] -async_env.add_source_files(objects_shared, async_env.sources, shared=True); -async_env.library_shared = async_env.SharedLibrary('#build/forstio-async', [objects_shared]); - -## Static lib -objects_static = [] -async_env.add_source_files(objects_static, async_env.sources, shared=False); -async_env.library_static = async_env.StaticLibrary('#build/forstio-async', [objects_static]); - -# Set Alias -env.Alias('library_async', [async_env.library_shared, async_env.library_static]); - -env.targets += ['library_async']; - -# Install -env.Install('$prefix/lib/', [async_env.library_shared, async_env.library_static]); -env.Install('$prefix/include/forstio/async/', [async_env.headers]); diff --git a/forstio/async/SConstruct b/forstio/async/SConstruct deleted file mode 100644 index 0d7b7c6..0000000 --- a/forstio/async/SConstruct +++ /dev/null @@ -1,66 +0,0 @@ -#!/usr/bin/env python3 - -import sys -import os -import os.path -import glob -import re - - -if sys.version_info < (3,): - def isbasestring(s): - return isinstance(s,basestring) -else: - def isbasestring(s): - return isinstance(s, (str,bytes)) - -def add_kel_source_files(self, sources, filetype, lib_env=None, shared=False, target_post=""): - - if isbasestring(filetype): - dir_path = self.Dir('.').abspath - filetype = sorted(glob.glob(dir_path+"/"+filetype)) - - for path in filetype: - target_name = re.sub( r'(.*?)(\.cpp|\.c\+\+)', r'\1' + target_post, path ) - if shared: - target_name+='.os' - sources.append( self.SharedObject( target=target_name, source=path ) ) - else: - target_name+='.o' - sources.append( self.StaticObject( target=target_name, source=path ) ) - pass - -def isAbsolutePath(key, dirname, env): - assert os.path.isabs(dirname), "%r must have absolute path syntax" % (key,) - -env_vars = Variables( - args=ARGUMENTS -) - -env_vars.Add('prefix', - help='Installation target location of build results and headers', - default='/usr/local/', - validator=isAbsolutePath -) - -env=Environment(ENV=os.environ, variables=env_vars, CPPPATH=[], - CPPDEFINES=['SAW_UNIX'], - CXXFLAGS=['-std=c++20','-g','-Wall','-Wextra'], - LIBS=['forstio-core']) -env.__class__.add_source_files = add_kel_source_files -env.Tool('compilation_db'); -env.cdb = env.CompilationDatabase('compile_commands.json'); - -env.objects = []; -env.sources = []; -env.headers = []; -env.targets = []; - -Export('env') -SConscript('SConscript') - -env.Alias('cdb', env.cdb); -env.Alias('all', [env.targets]); -env.Default('all'); - -env.Alias('install', '$prefix') diff --git a/forstio/async/async.cpp b/forstio/async/async.cpp deleted file mode 100644 index c53ffa6..0000000 --- a/forstio/async/async.cpp +++ /dev/null @@ -1,419 +0,0 @@ -#include "async.h" -#include <forstio/core/common.h> -#include <forstio/core/error.h> - -#include <algorithm> -#include <cassert> - -namespace saw { -namespace { -thread_local event_loop *local_loop = nullptr; - -event_loop ¤t_event_loop() { - event_loop *loop = local_loop; - assert(loop); - return *loop; -} -} // namespace - -conveyor_node::conveyor_node() {} - -conveyor_node_with_child_mixin::conveyor_node_with_child_mixin( - own<conveyor_node> &&child_, conveyor_node &owner) - : child{std::move(child_)} { - assert(child); - - child->notify_parent_attached(owner); -} - -error_or<own<conveyor_node>> -conveyor_node_with_child_mixin::swap_child(own<conveyor_node> &&swapee) { - SAW_ASSERT(child) { - return make_error<err::invalid_state>("Child should exist if this function is called"); - } - own<conveyor_node> old_child = std::move(child); - - /** - * We need the parent of the old_child's next storage - */ - conveyor_storage *old_storage = old_child->next_storage(); - conveyor_storage *old_storage_parent = - old_storage ? old_storage->get_parent() : nullptr; - - /** - * Swap in the new child - */ - if (swapee) { - child = std::move(swapee); - - /** - * Then we need to set the new child's storage parent since the next - * storage has a nullptr set And if the old_storage_parent is a nullptr, - * then it doesn't matter. So we don't check for it - */ - conveyor_storage *swapee_storage = child->next_storage(); - if (swapee_storage) { - swapee_storage->set_parent(old_storage_parent); - } - } - - return old_child; -} - -conveyor_storage::conveyor_storage() {} - -conveyor_storage::~conveyor_storage() {} - -conveyor_storage *conveyor_storage::get_parent() const { return parent_; } - -void conveyor_event_storage::set_parent(conveyor_storage *p) { - /* - * parent check isn't needed, but is used - * for the assert, because the storage should - * be armed if there was an element present - * and a valid parent - */ - if (/*!parent && */ p && !is_armed() && queued() > 0) { - assert(!parent_); - if (p->space() > 0) { - arm_later(); - } - } - - parent_ = p; -} - -conveyor_event_storage::conveyor_event_storage() : conveyor_storage{} {} - -conveyor_base::conveyor_base(own<conveyor_node> &&node_p) - : node_{std::move(node_p)} {} - -error propagate_error::operator()(const error &error) const { - return error.copy_error(); -} - -error propagate_error::operator()(error &&err) { return std::move(err); } - -event::event() : event(current_event_loop()) {} - -event::event(event_loop &loop) : loop_{loop} {} - -event::~event() { disarm(); } - -void event::arm_next() { - assert(&loop_ == local_loop); - if (prev_ == nullptr) { - // Push the next_insert_point back by one - // and inserts itself before that - next_ = *loop_.next_insert_point_; - prev_ = loop_.next_insert_point_; - *prev_ = this; - if (next_) { - next_->prev_ = &next_; - } - - // Set the new insertion ptr location to next - loop_.next_insert_point_ = &next_; - - // Pushes back the later insert point if it was pointing at the - // previous event - if (loop_.later_insert_point_ == prev_) { - loop_.later_insert_point_ = &next_; - } - - // If tail_ points at the same location then - // we are at the end and have to update tail_ then. - // Technically should be possible by checking if - // next is a `nullptr` - if (loop_.tail_ == prev_) { - loop_.tail_ = &next_; - } - - loop_.set_runnable(true); - } -} - -void event::arm_later() { - assert(&loop_ == local_loop); - - if (prev_ == nullptr) { - next_ = *loop_.later_insert_point_; - prev_ = loop_.later_insert_point_; - *prev_ = this; - if (next_) { - next_->prev_ = &next_; - } - - loop_.later_insert_point_ = &next_; - if (loop_.tail_ == prev_) { - loop_.tail_ = &next_; - } - - loop_.set_runnable(true); - } -} - -void event::arm_last() { - assert(&loop_ == local_loop); - - if (prev_ == nullptr) { - next_ = *loop_.later_insert_point_; - prev_ = loop_.later_insert_point_; - *prev_ = this; - if (next_) { - next_->prev_ = &next_; - } - - if (loop_.tail_ == prev_) { - loop_.tail_ = &next_; - } - - loop_.set_runnable(true); - } -} - -void event::disarm() { - if (prev_ != nullptr) { - if (loop_.tail_ == &next_) { - loop_.tail_ = prev_; - } - - if (loop_.next_insert_point_ == &next_) { - loop_.next_insert_point_ = prev_; - } - - *prev_ = next_; - if (next_) { - next_->prev_ = prev_; - } - - prev_ = nullptr; - next_ = nullptr; - } -} - -bool event::is_armed() const { return prev_ != nullptr; } - -conveyor_sink::conveyor_sink() : node_{nullptr} {} - -conveyor_sink::conveyor_sink(own<conveyor_node> &&node_p) - : node_{std::move(node_p)} {} - -void event_loop::set_runnable(bool runnable) { is_runnable_ = runnable; } - -event_loop::event_loop() {} - -event_loop::event_loop(own<class event_port> &&ep) - : event_port_{std::move(ep)} {} - -event_loop::~event_loop() { assert(local_loop != this); } - -void event_loop::enter_scope() { - assert(!local_loop); - local_loop = this; -} - -void event_loop::leave_scope() { - assert(local_loop == this); - local_loop = nullptr; -} - -bool event_loop::turn_loop() { - size_t turn_step = 0; - while (head_ && turn_step < 65536) { - if (!turn()) { - return false; - } - ++turn_step; - } - return true; -} - -bool event_loop::turn() { - event *event = head_; - - if (!event) { - return false; - } - - head_ = event->next_; - if (head_) { - head_->prev_ = &head_; - } - - next_insert_point_ = &head_; - if (later_insert_point_ == &event->next_) { - later_insert_point_ = &head_; - } - if (tail_ == &event->next_) { - tail_ = &head_; - } - - event->next_ = nullptr; - event->prev_ = nullptr; - - next_insert_point_ = &head_; - - event->fire(); - - return true; -} - -bool event_loop::wait(const std::chrono::steady_clock::duration &duration) { - if (event_port_) { - event_port_->wait(duration); - } - - return turn_loop(); -} - -bool event_loop::wait(const std::chrono::steady_clock::time_point &time_point) { - if (event_port_) { - event_port_->wait(time_point); - } - - return turn_loop(); -} - -bool event_loop::wait() { - if (event_port_) { - event_port_->wait(); - } - - return turn_loop(); -} - -bool event_loop::poll() { - if (event_port_) { - event_port_->poll(); - } - - return turn_loop(); -} - -event_port *event_loop::event_port() { return event_port_.get(); } - -conveyor_sink_set &event_loop::daemon() { - if (!daemon_sink_) { - daemon_sink_ = heap<conveyor_sink_set>(); - } - return *daemon_sink_; -} - -wait_scope::wait_scope(event_loop &loop) : loop_{loop} { loop_.enter_scope(); } - -wait_scope::~wait_scope() { loop_.leave_scope(); } - -void wait_scope::wait() { loop_.wait(); } - -void wait_scope::wait(const std::chrono::steady_clock::duration &duration) { - loop_.wait(duration); -} - -void wait_scope::wait(const std::chrono::steady_clock::time_point &time_point) { - loop_.wait(time_point); -} - -void wait_scope::poll() { loop_.poll(); } - -error_or<own<conveyor_node>> -convert_conveyor_node_base::swap_child(own<conveyor_node> &&swapee) noexcept { - return child_mixin_.swap_child(std::move(swapee)); -} - -conveyor_storage *convert_conveyor_node_base::next_storage() noexcept { - if (!child_mixin_.child) { - return nullptr; - } - return child_mixin_.child->next_storage(); -} - -immediate_conveyor_node_base::immediate_conveyor_node_base() - : conveyor_event_storage{} {} - -merge_conveyor_node_base::merge_conveyor_node_base() - : conveyor_event_storage{} {} - -error_or<own<conveyor_node>> queue_buffer_conveyor_node_base::swap_child( - own<conveyor_node> &&swapee_) noexcept { - return child_mixin_.swap_child(std::move(swapee_)); -} - -void conveyor_sink_set::destroy_sink_conveyor_node(conveyor_node &node) { - if (!is_armed()) { - arm_last(); - } - - delete_nodes_.push(&node); -} - -void conveyor_sink_set::fail(error &&error) { - /// @todo call error_handler -} - -conveyor_sink_set::conveyor_sink_set(event_loop &event_loop) - : event{event_loop} {} - -void conveyor_sink_set::add(conveyor<void> &&sink) { - auto nas = conveyor<void>::from_conveyor(std::move(sink)); - SAW_ASSERT(nas) { return; } - conveyor_storage *storage = nas->next_storage(); - - own<sink_conveyor_node> sink_node = nullptr; - try { - sink_node = heap<sink_conveyor_node>(std::move(nas), *this); - } catch (std::bad_alloc &) { - return; - } - if (storage) { - storage->set_parent(sink_node.get()); - } - - sink_nodes_.emplace_back(std::move(sink_node)); -} - -void conveyor_sink_set::fire() { - while (!delete_nodes_.empty()) { - conveyor_node *node = delete_nodes_.front(); - /*auto erased = */ std::remove_if(sink_nodes_.begin(), - sink_nodes_.end(), - [node](own<conveyor_node> &element) { - return node == element.get(); - }); - delete_nodes_.pop(); - } -} - -convert_conveyor_node_base::convert_conveyor_node_base(own<conveyor_node> &&dep) - : child_mixin_{std::move(dep), *this} {} - -void convert_conveyor_node_base::get_result(error_or_value &err_or_val) { - get_impl(err_or_val); -} - -void attach_conveyor_node_base::get_result( - error_or_value &err_or_val) noexcept { - if (child_mixin_.child) { - child_mixin_.child->get_result(err_or_val); - } -} - -error_or<own<conveyor_node>> -attach_conveyor_node_base::swap_child(own<conveyor_node> &&swapee_) noexcept { - return child_mixin_.swap_child(std::move(swapee_)); -} - -conveyor_storage *attach_conveyor_node_base::next_storage() noexcept { - if (!child_mixin_.child) { - return nullptr; - } - - return child_mixin_.child->next_storage(); -} - -void detach_conveyor(conveyor<void> &&conveyor) { - event_loop &loop = current_event_loop(); - conveyor_sink_set &sink = loop.daemon(); - sink.add(std::move(conveyor)); -} -} // namespace saw diff --git a/forstio/async/async.h b/forstio/async/async.h deleted file mode 100644 index 4cfed60..0000000 --- a/forstio/async/async.h +++ /dev/null @@ -1,1023 +0,0 @@ -#pragma once - -#include <forstio/core/common.h> -#include <forstio/core/error.h> - -#include <chrono> -#include <functional> -#include <limits> -#include <list> -#include <queue> -#include <type_traits> - -namespace saw { -class conveyor_storage; -class conveyor_node { -public: - conveyor_node(); - virtual ~conveyor_node() = default; - - /** - * Internal method to retrieve results from children - */ - virtual void get_result(error_or_value &err_or_val) = 0; - - /** - * Swap out child with another one - */ - virtual error_or<own<conveyor_node>> - swap_child(own<conveyor_node> &&swapee_) = 0; - - /** - * Retrieve the next storage node - */ - virtual conveyor_storage *next_storage() = 0; - - /** - * Notify that a new parent was attached - * Only relevant for the feeding nodes - */ - virtual void notify_parent_attached(conveyor_node &){}; -}; - -class conveyor_node_with_child_mixin final { -public: - own<conveyor_node> child = nullptr; - - conveyor_node_with_child_mixin(own<conveyor_node> &&child_, - conveyor_node &owner_); - ~conveyor_node_with_child_mixin() = default; - - /** - * Swap out children and return the child ptr, since the caller is the child - * itself. Stack needs to be cleared before the child is destroyed, so the - * swapped out node is returned as well. - */ - error_or<own<conveyor_node>> swap_child(own<conveyor_node> &&swapee); -}; - -class conveyor_node_with_parent_mixin final { -public: - conveyor_node *parent = nullptr; - - error_or<own<conveyor_node>> - swap_child_of_parent(own<conveyor_node> &&swapee) { - SAW_ASSERT(parent) { - return make_error<err::invalid_state>( - "Can't swap child, because parent doesn't exist"); - } - - return parent->swap_child(std::move(swapee)); - } - void change_parent(conveyor_node *p) { parent = p; } -}; - -class event_loop; -class wait_scope; -/* - * Event class similar to capn'proto. - * https://github.com/capnproto/capnproto - */ -class event { -private: - event_loop &loop_; - event **prev_ = nullptr; - event *next_ = nullptr; - - friend class event_loop; - -public: - event(); - event(event_loop &loop); - virtual ~event(); - - virtual void fire() = 0; - - void arm_next(); - void arm_later(); - void arm_last(); - void disarm(); - - bool is_armed() const; -}; - -class conveyor_storage { -protected: - conveyor_storage *parent_ = nullptr; - -public: - conveyor_storage(); - virtual ~conveyor_storage(); - - virtual size_t space() const = 0; - virtual size_t queued() const = 0; - virtual void child_has_fired() = 0; - virtual void parent_has_fired() = 0; - - virtual void set_parent(conveyor_storage *parent) = 0; - conveyor_storage *get_parent() const; -}; - -class conveyor_event_storage : public conveyor_storage, public event { -public: - conveyor_event_storage(); - virtual ~conveyor_event_storage() = default; - - void set_parent(conveyor_storage *parent) override; -}; - -class conveyor_base { -protected: - own<conveyor_node> node_; - -public: - conveyor_base(own<conveyor_node> &&node_p); - virtual ~conveyor_base() = default; - - conveyor_base(conveyor_base &&) = default; - conveyor_base &operator=(conveyor_base &&) = default; - - void get(error_or_value &err_or_val); -}; - -template <typename T> class conveyor; - -template <typename T> conveyor<T> chained_conveyor_type(T *); - -// template <typename T> Conveyor<T> chainedConveyorType(Conveyor<T> *); - -template <typename T> T remove_error_or_type(T *); - -template <typename T> T remove_error_or_type(error_or<T> *); - -template <typename T> -using remove_error_or = decltype(remove_error_or_type((T *)nullptr)); - -template <typename T> -using chained_conveyors = decltype(chained_conveyor_type((T *)nullptr)); - -template <typename Func, typename T> -using conveyor_result = - chained_conveyors<remove_error_or<return_type<Func, T>>>; - -struct propagate_error { -public: - error operator()(const error &err) const; - error operator()(error &&err); -}; - -class conveyor_sink { -private: - own<conveyor_node> node_; - -public: - conveyor_sink(); - conveyor_sink(own<conveyor_node> &&node); - - conveyor_sink(conveyor_sink &&) = default; - conveyor_sink &operator=(conveyor_sink &&) = default; -}; - -template <typename T> class merge_conveyor_node_data; - -template <typename T> class merge_conveyor { -private: - lent<merge_conveyor_node_data<T>> data_; - -public: - merge_conveyor() = default; - merge_conveyor(lent<merge_conveyor_node_data<T>> d); - ~merge_conveyor(); - - void attach(conveyor<T> conv); -}; - -/** - * Main interface for async operations. - */ -template <typename T> class conveyor final : public conveyor_base { -public: - /** - * Construct an immediately fulfilled node - */ - conveyor(fix_void<T> value); - - /** - * Construct an immediately failed node - */ - conveyor(error &&err); - - /** - * Construct a conveyor with a child node - */ - conveyor(own<conveyor_node> node_p); - - conveyor(conveyor<T> &&) = default; - conveyor<T> &operator=(conveyor<T> &&) = default; - - /** - * This method converts values or errors from children - */ - template <typename Func, typename ErrorFunc = propagate_error> - [[nodiscard]] conveyor_result<Func, T> - then(Func &&func, ErrorFunc &&error_func = propagate_error()); - - /** - * This method adds a buffer node in the conveyor chains which acts as a - * scheduler interrupt point and collects elements up to the supplied limit. - */ - [[nodiscard]] conveyor<T> - buffer(size_t limit = std::numeric_limits<size_t>::max()); - - /** - * This method just takes ownership of any supplied types, - * which are destroyed when the chain gets destroyed. - * Useful for resource lifetime control. - */ - template <typename... Args> - [[nodiscard]] conveyor<T> attach(Args &&...args); - - /** @todo implement - * This method limits the total amount of passed elements - * Be careful where you place this node into the chain. - * If you meant to fork it and destroy paths you shouldn't place - * an interrupt point between the fork and this limiter - */ - [[nodiscard]] conveyor<T> limit(size_t val = 1); - - /** - * - */ - [[nodiscard]] std::pair<conveyor<T>, merge_conveyor<T>> merge(); - - /** - * Moves the conveyor chain into a thread local storage point which drops - * every element. Use sink() if you want to control the lifetime of a - * conveyor chain - */ - template <typename ErrorFunc = propagate_error> - void detach(ErrorFunc &&err_func = propagate_error()); - /** - * Creates a local sink which drops elements, but lifetime control remains - * in your hand. - */ - template <typename ErrorFunc = propagate_error> - [[nodiscard]] conveyor_sink - sink(ErrorFunc &&error_func = propagate_error()); - - /** - * If no sink() or detach() is used you have to take elements out of the - * chain yourself. - */ - error_or<fix_void<T>> take(); - - /** @todo implement - * Specifically pump elements through this chain with the provided - * wait_scope - */ - void poll(wait_scope &wait_scope); - - // helper - static conveyor<T> to_conveyor(own<conveyor_node> node); - - // helper - static own<conveyor_node> from_conveyor(conveyor<T> conveyor); -}; - -template <typename Func> conveyor_result<Func, void> exec_later(Func &&func); - -/* - * Join Conveyors into a single one - */ -template <typename... Args> -conveyor<std::tuple<Args...>> -join_conveyors(std::tuple<conveyor<Args>...> &conveyors); - -template <typename T> class conveyor_feeder { -public: - virtual ~conveyor_feeder() = default; - - virtual void feed(T &&data) = 0; - virtual void fail(error &&error) = 0; - - virtual size_t space() const = 0; - virtual size_t queued() const = 0; - - virtual error swap(conveyor<T> &&conveyor) noexcept = 0; -}; - -template <> class conveyor_feeder<void> { -public: - virtual ~conveyor_feeder() = default; - - virtual void feed(void_t &&value = void_t{}) = 0; - virtual void fail(error &&error) = 0; - - virtual size_t space() const = 0; - virtual size_t queued() const = 0; - - virtual error swap(conveyor<void_t> &&conveyor) noexcept = 0; -}; - -template <typename T> struct conveyor_and_feeder { - own<conveyor_feeder<T>> feeder; - conveyor<T> conveyor; -}; - -template <typename T> conveyor_and_feeder<T> new_conveyor_and_feeder(); - -template <typename T> conveyor_and_feeder<T> one_time_conveyor_and_feeder(); - -enum class Signal : uint8_t { Terminate, User1 }; - -/** - * Class which acts as a correspondent between the running framework and outside - * events which may be signals from the operating system or just other threads. - * Default EventPorts are supplied by setupAsyncIo() in io.h - */ -class event_port { -public: - virtual ~event_port() = default; - - virtual conveyor<void> on_signal(Signal signal) = 0; - - virtual void poll() = 0; - virtual void wait() = 0; - virtual void wait(const std::chrono::steady_clock::duration &) = 0; - virtual void wait(const std::chrono::steady_clock::time_point &) = 0; - - virtual void wake() = 0; -}; - -class sink_conveyor_node; - -class conveyor_sink_set final : public event { -private: - /* - class Helper final : public Event { - private: - void destroySinkConveyorNode(ConveyorNode& sink); - void fail(Error&& error); - - std::vector<Own<ConveyorNode>> sink_nodes; - std::queue<ConveyorNode*> delete_nodes; - std::function<void(Error&& error)> error_handler; - - public: - ConveyorSinks() = default; - ConveyorSinks(EventLoop& event_loop); - - void add(Conveyor<void> node); - - void fire() override {} - }; - - gin::Own<Helper> helper; - */ - friend class sink_conveyor_node; - - void destroy_sink_conveyor_node(conveyor_node &sink_node); - void fail(error &&err); - - std::list<own<conveyor_node>> sink_nodes_; - - std::queue<conveyor_node *> delete_nodes_; - - std::function<void(error &&)> error_handler_; - -public: - // ConveyorSinks(); - // ConveyorSinks(EventLoop& event_loop); - conveyor_sink_set() = default; - conveyor_sink_set(event_loop &event_loop); - - void add(conveyor<void> &&node); - - void fire() override; -}; - -/* - * EventLoop class similar to capn'proto. - * https://github.com/capnproto/capnproto - */ -class event_loop { -private: - friend class event; - event *head_ = nullptr; - event **tail_ = &head_; - event **next_insert_point_ = &head_; - event **later_insert_point_ = &head_; - - bool is_runnable_ = false; - - own<event_port> event_port_ = nullptr; - - own<conveyor_sink_set> daemon_sink_ = nullptr; - - // functions - void set_runnable(bool runnable); - - friend class wait_scope; - void enter_scope(); - void leave_scope(); - - bool turn_loop(); - bool turn(); - -public: - event_loop(); - event_loop(own<event_port> &&port); - ~event_loop(); - - event_loop(event_loop &&) = default; - event_loop &operator=(event_loop &&) = default; - - bool wait(); - bool wait(const std::chrono::steady_clock::duration &); - bool wait(const std::chrono::steady_clock::time_point &); - bool poll(); - - event_port *event_port(); - - conveyor_sink_set &daemon(); -}; - -/* - * WaitScope class similar to capn'proto. - * https://github.com/capnproto/capnproto - */ -class wait_scope { -private: - event_loop &loop_; - -public: - wait_scope(event_loop &loop); - ~wait_scope(); - - void wait(); - void wait(const std::chrono::steady_clock::duration &); - void wait(const std::chrono::steady_clock::time_point &); - void poll(); -}; - -template <typename Func> conveyor_result<Func, void> yield_next(Func &&func); - -template <typename Func> conveyor_result<Func, void> yield_later(Func &&func); - -template <typename Func> conveyor_result<Func, void> yield_last(Func &&func); -} // namespace saw - -// Secret stuff -// Aka private semi hidden classes -namespace saw { - -template <typename Out, typename In> struct fix_void_caller { - template <typename Func> static Out apply(Func &func, In &&in) { - return func(std::move(in)); - } -}; - -template <typename Out> struct fix_void_caller<Out, void_t> { - template <typename Func> static Out apply(Func &func, void_t &&in) { - (void)in; - return func(); - } -}; - -template <typename In> struct fix_void_caller<void_t, In> { - template <typename Func> static void_t apply(Func &func, In &&in) { - func(std::move(in)); - return void_t{}; - } -}; - -template <> struct fix_void_caller<void_t, void_t> { - template <typename Func> static void_t apply(Func &func, void_t &&in) { - (void)in; - func(); - return void_t{}; - } -}; - -template <typename T> class adapt_conveyor_node; - -template <typename T> -class adapt_conveyor_feeder final : public conveyor_feeder<unfix_void<T>> { -private: - adapt_conveyor_node<T> *feedee_ = nullptr; - -public: - ~adapt_conveyor_feeder(); - - void set_feedee(adapt_conveyor_node<T> *feedee); - - void feed(T &&value) override; - void fail(error &&error) override; - - size_t space() const override; - size_t queued() const override; - - error swap(conveyor<T> &&conv) noexcept override; -}; - -template <typename T> -class adapt_conveyor_node final : public conveyor_node, - public conveyor_event_storage { -private: - adapt_conveyor_feeder<T> *feeder_ = nullptr; - - std::queue<error_or<unfix_void<T>>> storage_; - - conveyor_node_with_parent_mixin parent_node_; - -public: - adapt_conveyor_node(); - ~adapt_conveyor_node(); - - void set_feeder(adapt_conveyor_feeder<T> *feeder); - - void feed(T &&value); - void fail(error &&error); - - // ConveyorNode - void get_result(error_or_value &err_or_val) override; - - error_or<own<conveyor_node>> - swap_child(own<conveyor_node> &&swapee) noexcept override; - - conveyor_storage *next_storage() noexcept override; - void notify_parent_attached(conveyor_node &) noexcept override; - - // ConveyorStorage - size_t space() const override; - size_t queued() const override; - - void child_has_fired() override; - void parent_has_fired() override; - - // Event - void fire() override; -}; - -template <typename T> class one_time_conveyor_node; - -template <typename T> -class one_time_conveyor_feeder final : public conveyor_feeder<unfix_void<T>> { -private: - one_time_conveyor_node<T> *feedee_ = nullptr; - -public: - ~one_time_conveyor_feeder(); - - void set_feedee(one_time_conveyor_node<T> *feedee); - - void feed(T &&value) override; - void fail(error &&error) override; - - size_t space() const override; - size_t queued() const override; -}; - -template <typename T> -class one_time_conveyor_node final : public conveyor_node, - public conveyor_storage, - public event { -private: - one_time_conveyor_feeder<T> *feeder_ = nullptr; - - bool passed_ = false; - maybe<error_or<T>> storage_ = std::nullopt; - -public: - ~one_time_conveyor_node(); - - void set_feeder(one_time_conveyor_feeder<T> *feeder); - - void feed(T &&value); - void fail(error &&error); - - // ConveyorNode - void get_result(error_or_value &err_or_val) override; - - error_or<own<conveyor_node>> - swap_child(own<conveyor_node> &&swapee) override; - - // ConveyorStorage - size_t space() const override; - size_t queued() const override; - - void child_has_fired() override {} - void parent_has_fired() override; - - // Event - void fire() override; -}; - -/** - * This class buffers and saves incoming elements and acts as an interrupt node - * for processing calls - */ -class queue_buffer_conveyor_node_base : public conveyor_node, - public conveyor_event_storage { -protected: - conveyor_node_with_child_mixin child_mixin_; - -public: - queue_buffer_conveyor_node_base(own<conveyor_node> child_) - : conveyor_event_storage{}, child_mixin_{std::move(child_), *this} {} - virtual ~queue_buffer_conveyor_node_base() = default; - - /** - * Use mixin - */ - error_or<own<conveyor_node>> - swap_child(own<conveyor_node> &&swapee_) noexcept override; - - conveyor_storage *next_storage() noexcept override { - return static_cast<conveyor_storage *>(this); - } -}; - -template <typename T> -class queue_buffer_conveyor_node final - : public queue_buffer_conveyor_node_base { -private: - std::queue<error_or<T>> storage_; - size_t max_store_; - -public: - queue_buffer_conveyor_node(own<conveyor_node> dep, size_t max_size) - : queue_buffer_conveyor_node_base{std::move(dep)}, max_store_{ - max_size} {} - // Event - void fire() override; - // ConveyorNode - void get_result(error_or_value &eov) noexcept override; - - // ConveyorStorage - size_t space() const override; - size_t queued() const override; - - void child_has_fired() override; - void parent_has_fired() override; -}; - -class attach_conveyor_node_base : public conveyor_node { -protected: - conveyor_node_with_child_mixin child_mixin_; - -public: - attach_conveyor_node_base(own<conveyor_node> &&child_) - : child_mixin_{std::move(child_), *this} {} - - virtual ~attach_conveyor_node_base() = default; - - void get_result(error_or_value &err_or_val) noexcept override; - - /** - * Use mixin - */ - error_or<own<conveyor_node>> - swap_child(own<conveyor_node> &&swapee_) noexcept override; - - conveyor_storage *next_storage() noexcept override; -}; - -template <typename... Args> -class attach_conveyor_node final : public attach_conveyor_node_base { -public: - attach_conveyor_node(own<conveyor_node> &&dep, Args &&...args) - : attach_conveyor_node_base(std::move(dep)), attached_data_{ - std::move(args...)} {} - -private: - std::tuple<Args...> attached_data_; -}; - -class convert_conveyor_node_base : public conveyor_node { -public: - convert_conveyor_node_base(own<conveyor_node> &&dep); - virtual ~convert_conveyor_node_base() = default; - - void get_result(error_or_value &err_or_val) override; - - virtual void get_impl(error_or_value &err_or_val) = 0; - - error_or<own<conveyor_node>> - swap_child(own<conveyor_node> &&swapee) noexcept override; - - conveyor_storage *next_storage() noexcept override; - -protected: - conveyor_node_with_child_mixin child_mixin_; -}; - -template <typename T, typename DepT, typename Func, typename ErrorFunc> -class convert_conveyor_node final : public convert_conveyor_node_base { -private: - Func func_; - ErrorFunc error_func_; - - static_assert(std::is_same<DepT, remove_error_or<DepT>>::value, - "Should never be of type ErrorOr"); - -public: - convert_conveyor_node(own<conveyor_node> &&dep, Func &&func, - ErrorFunc &&error_func) - : convert_conveyor_node_base(std::move(dep)), func_{std::move(func)}, - error_func_{std::move(error_func)} {} - - void get_impl(error_or_value &err_or_val) noexcept override { - error_or<unfix_void<DepT>> dep_eov; - error_or<unfix_void<remove_error_or<T>>> &eov = - err_or_val.as<unfix_void<remove_error_or<T>>>(); - if (child_mixin_.child) { - child_mixin_.child->get_result(dep_eov); - if (dep_eov.is_value()) { - try { - - eov = fix_void_caller<T, DepT>::apply( - func_, std::move(dep_eov.get_value())); - } catch (const std::bad_alloc &) { - eov = make_error<err::out_of_memory>("Out of memory"); - } catch (const std::exception &) { - eov = make_error<err::invalid_state>( - "Exception in chain occured. Return ErrorOr<T> if you " - "want to handle errors which are recoverable"); - } - } else if (dep_eov.is_error()) { - eov = error_func_(std::move(dep_eov.get_error())); - } else { - eov = make_error<err::invalid_state>("No value set in dependency"); - } - } else { - eov = make_error<err::invalid_state>("Conveyor doesn't have child"); - } - } -}; - -class sink_conveyor_node final : public conveyor_node, - public conveyor_event_storage { -private: - conveyor_node_with_child_mixin child_mixin_; - conveyor_sink_set *conveyor_sink_; - -public: - sink_conveyor_node(own<conveyor_node> node, conveyor_sink_set &conv_sink) - : conveyor_event_storage{}, child_mixin_{std::move(node), *this}, - conveyor_sink_{&conv_sink} {} - - sink_conveyor_node(own<conveyor_node> node) - : conveyor_event_storage{}, child_mixin_{std::move(node), *this}, - conveyor_sink_{nullptr} {} - - // Event only queued if a critical error occured - void fire() override { - // Queued for destruction of children, because this acts as a sink and - // no other event should be here - child_mixin_.child = nullptr; - - if (conveyor_sink_) { - conveyor_sink_->destroy_sink_conveyor_node(*this); - conveyor_sink_ = nullptr; - } - } - - // ConveyorStorage - size_t space() const override { return 1; } - size_t queued() const override { return 0; } - - // ConveyorNode - void get_result(error_or_value &err_or_val) noexcept override { - err_or_val.as<void_t>() = - make_error<err::invalid_state>("In a sink node no result can be returned"); - } - - error_or<own<conveyor_node>> - swap_child(own<conveyor_node> &&swapee) noexcept override { - return child_mixin_.swap_child(std::move(swapee)); - } - - // ConveyorStorage - void child_has_fired() override { - if (child_mixin_.child) { - error_or<void> dep_eov; - child_mixin_.child->get_result(dep_eov); - if (dep_eov.is_error()) { - if (dep_eov.get_error().is_critical()) { - if (!is_armed()) { - arm_last(); - } - } - if (conveyor_sink_) { - conveyor_sink_->fail(std::move(dep_eov.get_error())); - } - } - } - } - - /* - * No parent needs to be fired since we always have space - */ - void parent_has_fired() override {} - - conveyor_storage *next_storage() override { - // Should never happen though - assert(false); - return nullptr; - // return static_cast<ConveyorStorage*>(this); - } -}; - -class immediate_conveyor_node_base : public conveyor_node, - public conveyor_event_storage { -private: -public: - immediate_conveyor_node_base(); - - virtual ~immediate_conveyor_node_base() = default; - - error_or<own<conveyor_node>> - swap_child(own<conveyor_node> &&swapee) noexcept override { - (void)swapee; - return make_error<err::not_supported>("Node doesn't support swapping"); - } - - conveyor_storage *next_storage() noexcept override { - return static_cast<conveyor_storage *>(this); - } -}; - -template <typename T> -class immediate_conveyor_node final : public immediate_conveyor_node_base { -private: - error_or<fix_void<T>> value_; - uint8_t retrieved_; - -public: - immediate_conveyor_node(fix_void<T> &&val); - immediate_conveyor_node(error &&error); - - // ConveyorStorage - size_t space() const override; - size_t queued() const override; - - void child_has_fired() override; - void parent_has_fired() override; - - // ConveyorNode - void get_result(error_or_value &err_or_val) noexcept override { - if (retrieved_ > 0) { - err_or_val.as<fix_void<T>>() = - make_error<err::buffer_exhausted>("Already taken value"); - } else { - err_or_val.as<fix_void<T>>() = std::move(value_); - } - if (queued() > 0) { - ++retrieved_; - } - } - - // Event - void fire() override; -}; - -/* - * Collects every incoming value and throws it in one lane - */ -class merge_conveyor_node_base : public conveyor_node, - public conveyor_event_storage { -public: - merge_conveyor_node_base(); - - virtual ~merge_conveyor_node_base() = default; - - conveyor_storage *next_storage() noexcept override { - return static_cast<conveyor_storage *>(this); - } -}; - -template <typename T> -class merge_conveyor_node : public merge_conveyor_node_base { -private: - class appendage final : public conveyor_node, public conveyor_storage { - public: - own<conveyor_node> child; - merge_conveyor_node *merger; - - maybe<error_or<fix_void<T>>> error_or_value_; - - public: - appendage(own<conveyor_node> n, merge_conveyor_node &m) - : conveyor_storage{}, child{std::move(n)}, merger{&m}, - error_or_value_{std::nullopt} {} - - bool child_storage_has_element_queued() const { - if (!child) { - return false; - } - conveyor_storage *storage = child->next_storage(); - if (storage) { - return storage->queued() > 0; - } - return false; - } - - void get_appendage_result(error_or_value &eov); - - /** - * ConveyorNode - */ - error_or<own<conveyor_node>> - swap_child(own<conveyor_node> &&swapee_) override; - - conveyor_storage *next_storage() noexcept override { - return static_cast<conveyor_storage *>(this); - } - - void get_result(error_or_value &err_or_val) override; - - /** - * ConveyorStorage - */ - size_t space() const override; - - size_t queued() const override; - - void child_has_fired() override; - - void parent_has_fired() override; - - void set_parent(conveyor_storage *par) override; - }; - - friend class merge_conveyor_node_data<T>; - friend class appendage; - - our<merge_conveyor_node_data<T>> data_; - size_t next_appendage_ = 0; - -public: - merge_conveyor_node(our<merge_conveyor_node_data<T>> data); - ~merge_conveyor_node(); - // ConveyorNode - error_or<own<conveyor_node>> - swap_child(own<conveyor_node> &&c) noexcept override; - - // Event - void get_result(error_or_value &err_or_val) noexcept override; - - void fire() override; - - // ConveyorStorage - size_t space() const override; - size_t queued() const override; - void child_has_fired() override; - void parent_has_fired() override; -}; - -template <typename T> class merge_conveyor_node_data { -public: - std::vector<own<typename merge_conveyor_node<T>::appendage>> appendages; - - merge_conveyor_node<T> *merger = nullptr; - -public: - void attach(conveyor<T> conv); - - void governing_node_destroyed(); -}; - -/* -class JoinConveyorNodeBase : public ConveyorNode, public ConveyorEventStorage { -private: - -public: -}; - -template <typename... Args> -class JoinConveyorNode final : public JoinConveyorNodeBase { -private: - template<typename T> - class Appendage : public ConveyorEventStorage { - private: - Maybe<T> data = std::nullopt; - - public: - size_t space() const override; - size_t queued() const override; - - void fire() override; - void get_result(ErrorOrValue& eov) override; - }; - - std::tuple<Appendage<Args>...> appendages; - -public: -}; - -*/ - -} // namespace saw - -#include "async.tmpl.h" diff --git a/forstio/async/async.tmpl.h b/forstio/async/async.tmpl.h deleted file mode 100644 index d081fa9..0000000 --- a/forstio/async/async.tmpl.h +++ /dev/null @@ -1,769 +0,0 @@ -#pragma once - -#include <forstio/core/common.h> -#include <forstio/core/error.h> - -#include <cassert> -// Template inlining - -#include <iostream> - -namespace saw { - -template <typename Func> conveyor_result<Func, void> execLater(Func &&func) { - conveyor<void> conveyor{fix_void<void>{}}; - return conveyor.then(std::move(func)); -} - -template <typename T> -conveyor<T>::conveyor(fix_void<T> value) : conveyor_base(nullptr) { - // Is there any way to do this? - // @todo new conveyor_base constructor for Immediate values - - own<immediate_conveyor_node<fix_void<T>>> immediate = - heap<immediate_conveyor_node<fix_void<T>>>(std::move(value)); - - if (!immediate) { - return; - } - - node_ = std::move(immediate); -} - -template <typename T> -conveyor<T>::conveyor(error &&err) : conveyor_base(nullptr) { - own<immediate_conveyor_node<fix_void<T>>> immediate = - heap<immediate_conveyor_node<fix_void<T>>>(std::move(err)); - - if (!immediate) { - return; - } - - node_ = std::move(immediate); -} - -template <typename T> -conveyor<T>::conveyor(own<conveyor_node> node_p) - : conveyor_base{std::move(node_p)} {} - -template <typename T> -template <typename Func, typename ErrorFunc> -conveyor_result<Func, T> conveyor<T>::then(Func &&func, - ErrorFunc &&error_func) { - own<conveyor_node> conversion_node = - heap<convert_conveyor_node<fix_void<return_type<Func, T>>, fix_void<T>, - Func, ErrorFunc>>( - std::move(node_), std::move(func), std::move(error_func)); - - return conveyor<remove_error_or<return_type<Func, T>>>::to_conveyor( - std::move(conversion_node)); -} - -template <typename T> conveyor<T> conveyor<T>::buffer(size_t size) { - SAW_ASSERT(node_) { return conveyor<T>{own<conveyor_node>{nullptr}}; } - conveyor_storage *storage = node_->next_storage(); - SAW_ASSERT(storage) { return conveyor<T>{own<conveyor_node>{nullptr}}; } - - own<queue_buffer_conveyor_node<fix_void<T>>> storage_node = - heap<queue_buffer_conveyor_node<fix_void<T>>>(std::move(node_), size); - - conveyor_storage *storage_ptr = - static_cast<conveyor_storage *>(storage_node.get()); - - storage->set_parent(storage_ptr); - return conveyor<T>{std::move(storage_node)}; -} - -template <typename T> -template <typename... Args> -conveyor<T> conveyor<T>::attach(Args &&...args) { - own<attach_conveyor_node<Args...>> attach_node = - heap<attach_conveyor_node<Args...>>(std::move(node_), - std::move(args...)); - return conveyor<T>{std::move(attach_node)}; -} - -template <typename T> -std::pair<conveyor<T>, merge_conveyor<T>> conveyor<T>::merge() { - our<merge_conveyor_node_data<T>> data = - share<merge_conveyor_node_data<T>>(); - - own<merge_conveyor_node<T>> merge_node = heap<merge_conveyor_node<T>>(data); - - SAW_ASSERT(node_) { - return std::make_pair(conveyor<T>{own<conveyor_node>{nullptr}}, - merge_conveyor<T>{}); - } - conveyor_storage *storage = node_->next_storage(); - SAW_ASSERT(storage) { - return std::make_pair(conveyor<T>{own<conveyor_node>{nullptr}}, - merge_conveyor<T>{}); - } - - data->attach(conveyor<T>::to_conveyor(std::move(node_))); - - merge_conveyor<T> node_ref{data}; - - return std::make_pair(conveyor<T>{std::move(merge_node)}, - std::move(node_ref)); -} - -template <> -template <typename ErrorFunc> -conveyor_sink conveyor<void>::sink(ErrorFunc &&error_func) { - conveyor_storage *storage = node_->next_storage(); - SAW_ASSERT(storage) { return conveyor_sink{}; } - - own<sink_conveyor_node> sink_node = - heap<sink_conveyor_node>(std::move(node_)); - conveyor_storage *storage_ptr = - static_cast<conveyor_storage *>(sink_node.get()); - - storage->set_parent(storage_ptr); - - return conveyor_sink{std::move(sink_node)}; -} - -void detach_conveyor(conveyor<void> &&conveyor); - -template <typename T> -template <typename ErrorFunc> -void conveyor<T>::detach(ErrorFunc &&func) { - detach_conveyor(std::move(then([](T &&) {}, std::move(func)))); -} - -template <> -template <typename ErrorFunc> -void conveyor<void>::detach(ErrorFunc &&func) { - detach_conveyor(std::move(then([]() {}, std::move(func)))); -} - -template <typename T> -conveyor<T> conveyor<T>::to_conveyor(own<conveyor_node> node) { - return conveyor<T>{std::move(node)}; -} - -template <typename T> -own<conveyor_node> conveyor<T>::from_conveyor(conveyor<T> conveyor) { - return std::move(conveyor.node_); -} - -template <typename T> error_or<fix_void<T>> conveyor<T>::take() { - SAW_ASSERT(node_) { - return error_or<fix_void<T>>{ - make_error<err::invalid_state>("conveyor in invalid state")}; - } - conveyor_storage *storage = node_->next_storage(); - if (storage) { - if (storage->queued() > 0) { - error_or<fix_void<T>> result; - node_->get_result(result); - return result; - } else { - return error_or<fix_void<T>>{ - make_error<err::buffer_exhausted>("conveyor buffer has no elements")}; - } - } else { - return error_or<fix_void<T>>{ - make_error<err::invalid_state>("conveyor node has no child storage")}; - } -} - -template <typename T> conveyor_and_feeder<T> new_conveyor_and_feeder() { - own<adapt_conveyor_feeder<fix_void<T>>> feeder = - heap<adapt_conveyor_feeder<fix_void<T>>>(); - own<adapt_conveyor_node<fix_void<T>>> node = - heap<adapt_conveyor_node<fix_void<T>>>(); - - feeder->set_feedee(node.get()); - node->set_feeder(feeder.get()); - - return conveyor_and_feeder<T>{std::move(feeder), - conveyor<T>::to_conveyor(std::move(node))}; -} - -// QueueBuffer -template <typename T> void queue_buffer_conveyor_node<T>::fire() { - if (child_mixin_.child) { - if (!storage_.empty()) { - if (storage_.front().is_error()) { - if (storage_.front().get_error().is_critical()) { - child_mixin_.child = nullptr; - } - } - } - } - - bool has_space_before_fire = space() > 0; - - if (parent_) { - parent_->child_has_fired(); - if (!storage_.empty() && parent_->space() > 0) { - arm_later(); - } - } - - if (!child_mixin_.child) { - while (!storage_.empty()) { - storage_.pop(); - } - return; - } - - conveyor_storage *ch_storage = child_mixin_.child->next_storage(); - if (ch_storage && !has_space_before_fire) { - ch_storage->parent_has_fired(); - } -} - -template <typename T> -void queue_buffer_conveyor_node<T>::get_result(error_or_value &eov) noexcept { - error_or<T> &err_or_val = eov.as<T>(); - err_or_val = std::move(storage_.front()); - storage_.pop(); -} - -template <typename T> size_t queue_buffer_conveyor_node<T>::space() const { - return max_store_ - storage_.size(); -} - -template <typename T> size_t queue_buffer_conveyor_node<T>::queued() const { - return storage_.size(); -} - -template <typename T> void queue_buffer_conveyor_node<T>::child_has_fired() { - if (child_mixin_.child && storage_.size() < max_store_) { - error_or<T> eov; - child_mixin_.child->get_result(eov); - - if (eov.is_error()) { - if (eov.get_error().is_critical()) { - } - } - - storage_.push(std::move(eov)); - if (!is_armed()) { - arm_later(); - } - } -} - -template <typename T> void queue_buffer_conveyor_node<T>::parent_has_fired() { - SAW_ASSERT(parent_) { return; } - - if (parent_->space() == 0) { - return; - } - - if (queued() > 0) { - arm_later(); - } -} - -template <typename T> -immediate_conveyor_node<T>::immediate_conveyor_node(fix_void<T> &&val) - : value_{std::move(val)}, retrieved_{0} {} - -template <typename T> -immediate_conveyor_node<T>::immediate_conveyor_node(error &&error) - : value_{std::move(error)}, retrieved_{0} {} - -template <typename T> size_t immediate_conveyor_node<T>::space() const { - return 0; -} - -template <typename T> size_t immediate_conveyor_node<T>::queued() const { - return retrieved_ > 1 ? 0 : 1; -} - -template <typename T> void immediate_conveyor_node<T>::child_has_fired() { - // Impossible case - assert(false); -} - -template <typename T> void immediate_conveyor_node<T>::parent_has_fired() { - SAW_ASSERT(parent_) { return; } - assert(parent_->space() > 0); - - if (queued() > 0) { - arm_next(); - } -} - -template <typename T> void immediate_conveyor_node<T>::fire() { - - if (parent_) { - parent_->child_has_fired(); - if (queued() > 0 && parent_->space() > 0) { - arm_last(); - } - } -} - -template <typename T> -merge_conveyor<T>::merge_conveyor(lent<merge_conveyor_node_data<T>> d) - : data_{std::move(d)} {} - -template <typename T> merge_conveyor<T>::~merge_conveyor() {} - -template <typename T> void merge_conveyor<T>::attach(conveyor<T> conveyor) { - auto sp = data_.lock(); - SAW_ASSERT(sp) { return; } - - sp->attach(std::move(conveyor)); -} - -template <typename T> -merge_conveyor_node<T>::merge_conveyor_node(our<merge_conveyor_node_data<T>> d) - : data_{d} { - SAW_ASSERT(data_) { return; } - - data_->merger = this; -} - -template <typename T> merge_conveyor_node<T>::~merge_conveyor_node() {} - -template <typename T> -error_or<own<conveyor_node>> -merge_conveyor_node<T>::swap_child(own<conveyor_node> &&swapee_) noexcept { - (void)swapee_; - return make_error<err::invalid_state>( - "merge_conveyor_node<T>::appendage should block calls to this class"); -} - -template <typename T> -void merge_conveyor_node<T>::get_result(error_or_value &eov) noexcept { - error_or<fix_void<T>> &err_or_val = eov.as<fix_void<T>>(); - - SAW_ASSERT(data_) { return; } - - /// @todo search appendages for result - - auto &appendages = data_->appendages; - next_appendage_ = std::min(appendages.size(), next_appendage_); - - for (size_t i = next_appendage_; i < appendages.size(); ++i) { - if (appendages[i]->queued() > 0) { - err_or_val = std::move(appendages[i]->error_or_value_.value()); - appendages[i]->error_or_value_ = std::nullopt; - next_appendage_ = i + 1; - return; - } - } - for (size_t i = 0; i < next_appendage_; ++i) { - if (appendages[i]->queued() > 0) { - err_or_val = std::move(appendages[i]->error_or_value_.value()); - appendages[i]->error_or_value_ = std::nullopt; - next_appendage_ = i + 1; - return; - } - } - - err_or_val = make_error<err::invalid_state>("No value in Merge appendages"); -} - -template <typename T> void merge_conveyor_node<T>::fire() { - SAW_ASSERT(queued() > 0) { return; } - - if (parent_) { - parent_->child_has_fired(); - - if (queued() > 0 && parent_->space() > 0) { - arm_later(); - } - } -} - -template <typename T> size_t merge_conveyor_node<T>::space() const { return 0; } - -template <typename T> size_t merge_conveyor_node<T>::queued() const { - SAW_ASSERT(data_) { return 0; } - - size_t queue_count = 0; - - for (auto &iter : data_->appendages) { - queue_count += iter->queued(); - } - - return queue_count; -} - -template <typename T> void merge_conveyor_node<T>::child_has_fired() { - /// This can never happen - assert(false); -} - -template <typename T> void merge_conveyor_node<T>::parent_has_fired() { - SAW_ASSERT(parent_) { return; } - if (queued() > 0) { - if (parent_->space() > 0) { - arm_later(); - } - } -} - -/** - * merge_conveyor_node<T>::Apendage - */ - -template <typename T> -error_or<own<conveyor_node>> -merge_conveyor_node<T>::appendage::swap_child(own<conveyor_node> &&swapee_) { - own<conveyor_node> old_child = std::move(child); - - child = std::move(swapee_); - - // This case should never happen - SAW_ASSERT(old_child) { return make_error<err::invalid_state>("No child exists"); } - - return old_child; -} - -template <typename T> -void merge_conveyor_node<T>::appendage::get_result(error_or_value &eov) { - error_or<fix_void<T>> &err_or_val = eov.as<fix_void<T>>(); - - SAW_ASSERT(queued() > 0) { - err_or_val = - make_error<err::invalid_state>("No element queued in Merge appendage Node"); - return; - } - - err_or_val = std::move(error_or_value_.value()); - error_or_value_ = std::nullopt; -} - -template <typename T> size_t merge_conveyor_node<T>::appendage::space() const { - SAW_ASSERT(merger) { return 0; } - - if (error_or_value_.has_value()) { - return 0; - } - - return 1; -} - -template <typename T> size_t merge_conveyor_node<T>::appendage::queued() const { - SAW_ASSERT(merger) { return 0; } - - if (error_or_value_.has_value()) { - return 1; - } - - return 0; -} - -/// @todo delete this function. Replaced by the regular get_result -template <typename T> -void merge_conveyor_node<T>::appendage::get_appendage_result( - error_or_value &eov) { - error_or<fix_void<T>> &err_or_val = eov.as<fix_void<T>>(); - - SAW_ASSERT(queued() > 0) { - err_or_val = - make_error<err::invalid_state>("No element queued in Merge appendage Node"); - return; - } - - err_or_val = std::move(error_or_value_.value()); - error_or_value_ = std::nullopt; -} - -template <typename T> -void merge_conveyor_node<T>::appendage::child_has_fired() { - SAW_ASSERT(!error_or_value_.has_value()) { return; } - error_or<fix_void<T>> eov; - child->get_result(eov); - - error_or_value_ = std::move(eov); - - if (!merger->is_armed()) { - merger->arm_later(); - } -} - -template <typename T> -void merge_conveyor_node<T>::appendage::parent_has_fired() { - conveyor_storage *child_storage = child->next_storage(); - if (child_storage) { - child_storage->parent_has_fired(); - } -} - -template <typename T> -void merge_conveyor_node<T>::appendage::set_parent(conveyor_storage *par) { - SAW_ASSERT(merger) { return; } - - SAW_ASSERT(child) { return; } - - parent_ = par; -} - -template <typename T> -void merge_conveyor_node_data<T>::attach(conveyor<T> conv) { - auto nas = conveyor<T>::from_conveyor(std::move(conv)); - SAW_ASSERT(nas) { return; } - conveyor_storage *storage = nas->next_storage(); - SAW_ASSERT(storage) { return; } - - auto merge_node_appendage = - heap<typename merge_conveyor_node<T>::appendage>(std::move(nas), - *merger); - auto merge_node_appendage_ptr = merge_node_appendage.get(); - - storage->set_parent(merge_node_appendage.get()); - - SAW_ASSERT(merger) { return; } - - conveyor_storage *mrg_storage = merger->next_storage(); - SAW_ASSERT(mrg_storage) { return; } - - merge_node_appendage->set_parent(mrg_storage); - - appendages.push_back(std::move(merge_node_appendage)); - - /// @todo return this. necessary? maybe for the weird linking setup - /// maybe not - // return merge_node_appendage_ptr; -} - -template <typename T> -void merge_conveyor_node_data<T>::governing_node_destroyed() { - appendages.clear(); - merger = nullptr; -} - -template <typename T> adapt_conveyor_feeder<T>::~adapt_conveyor_feeder() { - if (feedee_) { - feedee_->set_feeder(nullptr); - feedee_ = nullptr; - } -} - -template <typename T> -void adapt_conveyor_feeder<T>::set_feedee(adapt_conveyor_node<T> *feedee_p) { - feedee_ = feedee_p; -} - -template <typename T> void adapt_conveyor_feeder<T>::feed(T &&value) { - if (feedee_) { - feedee_->feed(std::move(value)); - } -} - -template <typename T> void adapt_conveyor_feeder<T>::fail(error &&error) { - if (feedee_) { - feedee_->fail(std::move(error)); - } -} - -template <typename T> size_t adapt_conveyor_feeder<T>::queued() const { - if (feedee_) { - return feedee_->queued(); - } - return 0; -} - -template <typename T> size_t adapt_conveyor_feeder<T>::space() const { - if (feedee_) { - return feedee_->space(); - } - return 0; -} - -template <typename T> -error adapt_conveyor_feeder<T>::swap(conveyor<T> &&conv) noexcept { - SAW_ASSERT(feedee_) { return make_error<err::invalid_state>("No feedee connected"); } - - auto node = conveyor<T>::from_conveyor(std::move(conv)); - - feedee_->swap_child(std::move(node)); - - return no_error(); -} - -template <typename T> -adapt_conveyor_node<T>::adapt_conveyor_node() : conveyor_event_storage{} {} - -template <typename T> adapt_conveyor_node<T>::~adapt_conveyor_node() { - if (feeder_) { - feeder_->set_feedee(nullptr); - feeder_ = nullptr; - } -} - -template <typename T> -error_or<own<conveyor_node>> -adapt_conveyor_node<T>::swap_child(own<conveyor_node> &&swapee) noexcept { - // This should return the owning pointer of this instance - auto myself_err = parent_node_.swap_child_of_parent(std::move(swapee)); - - if (myself_err.is_error()) { - return myself_err; - } - - auto &myself = myself_err.get_value(); - - assert(myself.get() == this); - - return myself_err; -} - -template <typename T> -conveyor_storage *adapt_conveyor_node<T>::next_storage() noexcept { - return static_cast<conveyor_storage *>(this); -} - -template <typename T> -void adapt_conveyor_node<T>::notify_parent_attached( - conveyor_node &par) noexcept { - parent_node_.change_parent(&par); -} - -template <typename T> -void adapt_conveyor_node<T>::set_feeder(adapt_conveyor_feeder<T> *feeder_p) { - feeder_ = feeder_p; -} - -template <typename T> void adapt_conveyor_node<T>::feed(T &&value) { - storage_.push(std::move(value)); - arm_next(); -} - -template <typename T> void adapt_conveyor_node<T>::fail(error &&error) { - storage_.push(std::move(error)); - arm_next(); -} - -template <typename T> size_t adapt_conveyor_node<T>::queued() const { - return storage_.size(); -} - -template <typename T> size_t adapt_conveyor_node<T>::space() const { - return std::numeric_limits<size_t>::max() - storage_.size(); -} - -template <typename T> -void adapt_conveyor_node<T>::get_result(error_or_value &err_or_val) { - if (!storage_.empty()) { - err_or_val.as<T>() = std::move(storage_.front()); - storage_.pop(); - } else { - err_or_val.as<T>() = make_error<err::invalid_state>( - "Signal for retrieval of storage sent even though no " - "data is present"); - } -} - -template <typename T> void adapt_conveyor_node<T>::child_has_fired() { - // Adapt node has no children - assert(false); -} - -template <typename T> void adapt_conveyor_node<T>::parent_has_fired() { - SAW_ASSERT(parent_) { return; } - - if (parent_->space() == 0) { - return; - } -} - -template <typename T> void adapt_conveyor_node<T>::fire() { - if (parent_) { - parent_->child_has_fired(); - - if (storage_.size() > 0) { - arm_later(); - } - } -} - -template <typename T> one_time_conveyor_feeder<T>::~one_time_conveyor_feeder() { - if (feedee_) { - feedee_->set_feeder(nullptr); - feedee_ = nullptr; - } -} - -template <typename T> -void one_time_conveyor_feeder<T>::set_feedee( - one_time_conveyor_node<T> *feedee_p) { - feedee_ = feedee_p; -} - -template <typename T> void one_time_conveyor_feeder<T>::feed(T &&value) { - if (feedee_) { - feedee_->feed(std::move(value)); - } -} - -template <typename T> void one_time_conveyor_feeder<T>::fail(error &&error) { - if (feedee_) { - feedee_->fail(std::move(error)); - } -} - -template <typename T> size_t one_time_conveyor_feeder<T>::queued() const { - if (feedee_) { - return feedee_->queued(); - } - return 0; -} - -template <typename T> size_t one_time_conveyor_feeder<T>::space() const { - if (feedee_) { - return feedee_->space(); - } - return 0; -} - -template <typename T> one_time_conveyor_node<T>::~one_time_conveyor_node() { - if (feeder_) { - feeder_->set_feedee(nullptr); - feeder_ = nullptr; - } -} - -template <typename T> -void one_time_conveyor_node<T>::set_feeder( - one_time_conveyor_feeder<T> *feeder_p) { - feeder_ = feeder_p; -} - -template <typename T> void one_time_conveyor_node<T>::feed(T &&value) { - storage_ = std::move(value); - arm_next(); -} - -template <typename T> void one_time_conveyor_node<T>::fail(error &&error) { - storage_ = std::move(error); - arm_next(); -} - -template <typename T> size_t one_time_conveyor_node<T>::queued() const { - return storage_.has_value() ? 1 : 0; -} - -template <typename T> size_t one_time_conveyor_node<T>::space() const { - return passed_ ? 0 : 1; -} - -template <typename T> -void one_time_conveyor_node<T>::get_result(error_or_value &err_or_val) { - if (storage_.has_value()) { - err_or_val.as<T>() = std::move(storage_.value()); - storage_ = std::nullopt; - } else { - err_or_val.as<T>() = make_error<err::invalid_state>( - "Signal for retrieval of storage sent even though no " - "data is present"); - } -} - -template <typename T> void one_time_conveyor_node<T>::fire() { - if (parent_) { - parent_->child_has_fired(); - } -} - -} // namespace saw |