From a9d2025030d0a7641f4b0701bd4aff7d2db5aeb4 Mon Sep 17 00:00:00 2001 From: "Claudius \"keldu\" Holeksa" Date: Tue, 23 Jan 2024 13:08:41 +0100 Subject: async: Changed file endings --- modules/async/c++/async.h | 1023 -------------------------------------- modules/async/c++/async.hpp | 1023 ++++++++++++++++++++++++++++++++++++++ modules/async/c++/async.tmpl.h | 767 ---------------------------- modules/async/c++/async.tmpl.hpp | 767 ++++++++++++++++++++++++++++ 4 files changed, 1790 insertions(+), 1790 deletions(-) delete mode 100644 modules/async/c++/async.h create mode 100644 modules/async/c++/async.hpp delete mode 100644 modules/async/c++/async.tmpl.h create mode 100644 modules/async/c++/async.tmpl.hpp (limited to 'modules/async') diff --git a/modules/async/c++/async.h b/modules/async/c++/async.h deleted file mode 100644 index 66afa38..0000000 --- a/modules/async/c++/async.h +++ /dev/null @@ -1,1023 +0,0 @@ -#pragma once - -#include -#include - -#include -#include -#include -#include -#include -#include - -namespace saw { -class conveyor_storage; -class conveyor_node { -public: - conveyor_node(); - virtual ~conveyor_node() = default; - - /** - * Internal method to retrieve results from children - */ - virtual void get_result(error_or_value &err_or_val) = 0; - - /** - * Swap out child with another one - */ - virtual error_or> - swap_child(own &&swapee_) = 0; - - /** - * Retrieve the next storage node - */ - virtual conveyor_storage *next_storage() = 0; - - /** - * Notify that a new parent was attached - * Only relevant for the feeding nodes - */ - virtual void notify_parent_attached(conveyor_node &){}; -}; - -class conveyor_node_with_child_mixin final { -public: - own child = nullptr; - - conveyor_node_with_child_mixin(own &&child_, - conveyor_node &owner_); - ~conveyor_node_with_child_mixin() = default; - - /** - * Swap out children and return the child ptr, since the caller is the child - * itself. Stack needs to be cleared before the child is destroyed, so the - * swapped out node is returned as well. - */ - error_or> swap_child(own &&swapee); -}; - -class conveyor_node_with_parent_mixin final { -public: - conveyor_node *parent = nullptr; - - error_or> - swap_child_of_parent(own &&swapee) { - SAW_ASSERT(parent) { - return make_error( - "Can't swap child, because parent doesn't exist"); - } - - return parent->swap_child(std::move(swapee)); - } - void change_parent(conveyor_node *p) { parent = p; } -}; - -class event_loop; -class wait_scope; -/* - * Event class similar to capn'proto. - * https://github.com/capnproto/capnproto - */ -class event { -private: - event_loop &loop_; - event **prev_ = nullptr; - event *next_ = nullptr; - - friend class event_loop; - -public: - event(); - event(event_loop &loop); - virtual ~event(); - - virtual void fire() = 0; - - void arm_next(); - void arm_later(); - void arm_last(); - void disarm(); - - bool is_armed() const; -}; - -class conveyor_storage { -protected: - conveyor_storage *parent_ = nullptr; - -public: - conveyor_storage(); - virtual ~conveyor_storage(); - - virtual size_t space() const = 0; - virtual size_t queued() const = 0; - virtual void child_has_fired() = 0; - virtual void parent_has_fired() = 0; - - virtual void set_parent(conveyor_storage *parent) = 0; - conveyor_storage *get_parent() const; -}; - -class conveyor_event_storage : public conveyor_storage, public event { -public: - conveyor_event_storage(); - virtual ~conveyor_event_storage() = default; - - void set_parent(conveyor_storage *parent) override; -}; - -class conveyor_base { -protected: - own node_; - -public: - conveyor_base(own &&node_p); - virtual ~conveyor_base() = default; - - conveyor_base(conveyor_base &&) = default; - conveyor_base &operator=(conveyor_base &&) = default; - - void get(error_or_value &err_or_val); -}; - -template class conveyor; - -template conveyor chained_conveyor_type(T *); - -// template Conveyor chainedConveyorType(Conveyor *); - -template T remove_error_or_type(T *); - -template T remove_error_or_type(error_or *); - -template -using remove_error_or = decltype(remove_error_or_type((T *)nullptr)); - -template -using chained_conveyors = decltype(chained_conveyor_type((T *)nullptr)); - -template -using conveyor_result = - chained_conveyors>>; - -struct propagate_error { -public: - error operator()(const error &err) const; - error operator()(error &&err); -}; - -class conveyor_sink { -private: - own node_; - -public: - conveyor_sink(); - conveyor_sink(own &&node); - - conveyor_sink(conveyor_sink &&) = default; - conveyor_sink &operator=(conveyor_sink &&) = default; -}; - -template class merge_conveyor_node_data; - -template class merge_conveyor { -private: - lent> data_; - -public: - merge_conveyor() = default; - merge_conveyor(lent> d); - ~merge_conveyor(); - - void attach(conveyor conv); -}; - -/** - * Main interface for async operations. - */ -template class conveyor final : public conveyor_base { -public: - /** - * Construct an immediately fulfilled node - */ - conveyor(fix_void value); - - /** - * Construct an immediately failed node - */ - conveyor(error &&err); - - /** - * Construct a conveyor with a child node - */ - conveyor(own node_p); - - conveyor(conveyor &&) = default; - conveyor &operator=(conveyor &&) = default; - - /** - * This method converts values or errors from children - */ - template - [[nodiscard]] conveyor_result - then(Func &&func, ErrorFunc &&error_func = propagate_error()); - - /** - * This method adds a buffer node in the conveyor chains which acts as a - * scheduler interrupt point and collects elements up to the supplied limit. - */ - [[nodiscard]] conveyor - buffer(size_t limit = std::numeric_limits::max()); - - /** - * This method just takes ownership of any supplied types, - * which are destroyed when the chain gets destroyed. - * Useful for resource lifetime control. - */ - template - [[nodiscard]] conveyor attach(Args &&...args); - - /** @todo implement - * This method limits the total amount of passed elements - * Be careful where you place this node into the chain. - * If you meant to fork it and destroy paths you shouldn't place - * an interrupt point between the fork and this limiter - */ - [[nodiscard]] conveyor limit(size_t val = 1); - - /** - * - */ - [[nodiscard]] std::pair, merge_conveyor> merge(); - - /** - * Moves the conveyor chain into a thread local storage point which drops - * every element. Use sink() if you want to control the lifetime of a - * conveyor chain - */ - template - void detach(ErrorFunc &&err_func = propagate_error()); - /** - * Creates a local sink which drops elements, but lifetime control remains - * in your hand. - */ - template - [[nodiscard]] conveyor_sink - sink(ErrorFunc &&error_func = propagate_error()); - - /** - * If no sink() or detach() is used you have to take elements out of the - * chain yourself. - */ - error_or> take(); - - /** @todo implement - * Specifically pump elements through this chain with the provided - * wait_scope - */ - void poll(wait_scope &wait_scope); - - // helper - static conveyor to_conveyor(own node); - - // helper - static own from_conveyor(conveyor conveyor); -}; - -template conveyor_result exec_later(Func &&func); - -/* - * Join Conveyors into a single one - */ -template -conveyor> -join_conveyors(std::tuple...> &conveyors); - -template class conveyor_feeder { -public: - virtual ~conveyor_feeder() = default; - - virtual void feed(T &&data) = 0; - virtual void fail(error &&error) = 0; - - virtual size_t space() const = 0; - virtual size_t queued() const = 0; - - virtual error swap(conveyor &&conveyor) noexcept = 0; -}; - -template <> class conveyor_feeder { -public: - virtual ~conveyor_feeder() = default; - - virtual void feed(void_t &&value = void_t{}) = 0; - virtual void fail(error &&error) = 0; - - virtual size_t space() const = 0; - virtual size_t queued() const = 0; - - virtual error swap(conveyor &&conveyor) noexcept = 0; -}; - -template struct conveyor_and_feeder { - own> feeder; - class conveyor conveyor; -}; - -template conveyor_and_feeder new_conveyor_and_feeder(); - -template conveyor_and_feeder one_time_conveyor_and_feeder(); - -enum class Signal : uint8_t { Terminate, User1 }; - -/** - * Class which acts as a correspondent between the running framework and outside - * events which may be signals from the operating system or just other threads. - * Default EventPorts are supplied by setupAsyncIo() in io.h - */ -class event_port { -public: - virtual ~event_port() = default; - - virtual conveyor on_signal(Signal signal) = 0; - - virtual void poll() = 0; - virtual void wait() = 0; - virtual void wait(const std::chrono::steady_clock::duration &) = 0; - virtual void wait(const std::chrono::steady_clock::time_point &) = 0; - - virtual void wake() = 0; -}; - -class sink_conveyor_node; - -class conveyor_sink_set final : public event { -private: - /* - class Helper final : public Event { - private: - void destroySinkConveyorNode(ConveyorNode& sink); - void fail(Error&& error); - - std::vector> sink_nodes; - std::queue delete_nodes; - std::function error_handler; - - public: - ConveyorSinks() = default; - ConveyorSinks(EventLoop& event_loop); - - void add(Conveyor node); - - void fire() override {} - }; - - gin::Own helper; - */ - friend class sink_conveyor_node; - - void destroy_sink_conveyor_node(conveyor_node &sink_node); - void fail(error &&err); - - std::list> sink_nodes_; - - std::queue delete_nodes_; - - std::function error_handler_; - -public: - // ConveyorSinks(); - // ConveyorSinks(EventLoop& event_loop); - conveyor_sink_set() = default; - conveyor_sink_set(event_loop &event_loop); - - void add(conveyor &&node); - - void fire() override; -}; - -/* - * EventLoop class similar to capn'proto. - * https://github.com/capnproto/capnproto - */ -class event_loop { -private: - friend class event; - event *head_ = nullptr; - event **tail_ = &head_; - event **next_insert_point_ = &head_; - event **later_insert_point_ = &head_; - - bool is_runnable_ = false; - - own event_port_ = nullptr; - - own daemon_sink_ = nullptr; - - // functions - void set_runnable(bool runnable); - - friend class wait_scope; - void enter_scope(); - void leave_scope(); - - bool turn_loop(); - bool turn(); - -public: - event_loop(); - event_loop(own &&port); - ~event_loop(); - - event_loop(event_loop &&) = default; - event_loop &operator=(event_loop &&) = default; - - bool wait(); - bool wait(const std::chrono::steady_clock::duration &); - bool wait(const std::chrono::steady_clock::time_point &); - bool poll(); - - event_port *get_event_port(); - - conveyor_sink_set &daemon(); -}; - -/* - * WaitScope class similar to capn'proto. - * https://github.com/capnproto/capnproto - */ -class wait_scope { -private: - event_loop &loop_; - -public: - wait_scope(event_loop &loop); - ~wait_scope(); - - void wait(); - void wait(const std::chrono::steady_clock::duration &); - void wait(const std::chrono::steady_clock::time_point &); - void poll(); -}; - -template conveyor_result yield_next(Func &&func); - -template conveyor_result yield_later(Func &&func); - -template conveyor_result yield_last(Func &&func); -} // namespace saw - -// Secret stuff -// Aka private semi hidden classes -namespace saw { - -template struct fix_void_caller { - template static Out apply(Func &func, In &&in) { - return func(std::move(in)); - } -}; - -template struct fix_void_caller { - template static Out apply(Func &func, void_t &&in) { - (void)in; - return func(); - } -}; - -template struct fix_void_caller { - template static void_t apply(Func &func, In &&in) { - func(std::move(in)); - return void_t{}; - } -}; - -template <> struct fix_void_caller { - template static void_t apply(Func &func, void_t &&in) { - (void)in; - func(); - return void_t{}; - } -}; - -template class adapt_conveyor_node; - -template -class adapt_conveyor_feeder final : public conveyor_feeder> { -private: - adapt_conveyor_node *feedee_ = nullptr; - -public: - ~adapt_conveyor_feeder(); - - void set_feedee(adapt_conveyor_node *feedee); - - void feed(T &&value) override; - void fail(error &&error) override; - - size_t space() const override; - size_t queued() const override; - - error swap(conveyor &&conv) noexcept override; -}; - -template -class adapt_conveyor_node final : public conveyor_node, - public conveyor_event_storage { -private: - adapt_conveyor_feeder *feeder_ = nullptr; - - std::queue>> storage_; - - conveyor_node_with_parent_mixin parent_node_; - -public: - adapt_conveyor_node(); - ~adapt_conveyor_node(); - - void set_feeder(adapt_conveyor_feeder *feeder); - - void feed(T &&value); - void fail(error &&error); - - // ConveyorNode - void get_result(error_or_value &err_or_val) override; - - error_or> - swap_child(own &&swapee) noexcept override; - - conveyor_storage *next_storage() noexcept override; - void notify_parent_attached(conveyor_node &) noexcept override; - - // ConveyorStorage - size_t space() const override; - size_t queued() const override; - - void child_has_fired() override; - void parent_has_fired() override; - - // Event - void fire() override; -}; - -template class one_time_conveyor_node; - -template -class one_time_conveyor_feeder final : public conveyor_feeder> { -private: - one_time_conveyor_node *feedee_ = nullptr; - -public: - ~one_time_conveyor_feeder(); - - void set_feedee(one_time_conveyor_node *feedee); - - void feed(T &&value) override; - void fail(error &&error) override; - - size_t space() const override; - size_t queued() const override; -}; - -template -class one_time_conveyor_node final : public conveyor_node, - public conveyor_storage, - public event { -private: - one_time_conveyor_feeder *feeder_ = nullptr; - - bool passed_ = false; - maybe> storage_ = std::nullopt; - -public: - ~one_time_conveyor_node(); - - void set_feeder(one_time_conveyor_feeder *feeder); - - void feed(T &&value); - void fail(error &&error); - - // ConveyorNode - void get_result(error_or_value &err_or_val) override; - - error_or> - swap_child(own &&swapee) override; - - // ConveyorStorage - size_t space() const override; - size_t queued() const override; - - void child_has_fired() override {} - void parent_has_fired() override; - - // Event - void fire() override; -}; - -/** - * This class buffers and saves incoming elements and acts as an interrupt node - * for processing calls - */ -class queue_buffer_conveyor_node_base : public conveyor_node, - public conveyor_event_storage { -protected: - conveyor_node_with_child_mixin child_mixin_; - -public: - queue_buffer_conveyor_node_base(own child_) - : conveyor_event_storage{}, child_mixin_{std::move(child_), *this} {} - virtual ~queue_buffer_conveyor_node_base() = default; - - /** - * Use mixin - */ - error_or> - swap_child(own &&swapee_) noexcept override; - - conveyor_storage *next_storage() noexcept override { - return static_cast(this); - } -}; - -template -class queue_buffer_conveyor_node final - : public queue_buffer_conveyor_node_base { -private: - std::queue> storage_; - size_t max_store_; - -public: - queue_buffer_conveyor_node(own dep, size_t max_size) - : queue_buffer_conveyor_node_base{std::move(dep)}, max_store_{ - max_size} {} - // Event - void fire() override; - // ConveyorNode - void get_result(error_or_value &eov) noexcept override; - - // ConveyorStorage - size_t space() const override; - size_t queued() const override; - - void child_has_fired() override; - void parent_has_fired() override; -}; - -class attach_conveyor_node_base : public conveyor_node { -protected: - conveyor_node_with_child_mixin child_mixin_; - -public: - attach_conveyor_node_base(own &&child_) - : child_mixin_{std::move(child_), *this} {} - - virtual ~attach_conveyor_node_base() = default; - - void get_result(error_or_value &err_or_val) noexcept override; - - /** - * Use mixin - */ - error_or> - swap_child(own &&swapee_) noexcept override; - - conveyor_storage *next_storage() noexcept override; -}; - -template -class attach_conveyor_node final : public attach_conveyor_node_base { -public: - attach_conveyor_node(own &&dep, Args &&...args) - : attach_conveyor_node_base(std::move(dep)), attached_data_{ - std::move(args...)} {} - -private: - std::tuple attached_data_; -}; - -class convert_conveyor_node_base : public conveyor_node { -public: - convert_conveyor_node_base(own &&dep); - virtual ~convert_conveyor_node_base() = default; - - void get_result(error_or_value &err_or_val) override; - - virtual void get_impl(error_or_value &err_or_val) = 0; - - error_or> - swap_child(own &&swapee) noexcept override; - - conveyor_storage *next_storage() noexcept override; - -protected: - conveyor_node_with_child_mixin child_mixin_; -}; - -template -class convert_conveyor_node final : public convert_conveyor_node_base { -private: - Func func_; - ErrorFunc error_func_; - - static_assert(std::is_same>::value, - "Should never be of type ErrorOr"); - -public: - convert_conveyor_node(own &&dep, Func &&func, - ErrorFunc &&error_func) - : convert_conveyor_node_base(std::move(dep)), func_{std::move(func)}, - error_func_{std::move(error_func)} {} - - void get_impl(error_or_value &err_or_val) noexcept override { - error_or> dep_eov; - error_or>> &eov = - err_or_val.as>>(); - if (child_mixin_.child) { - child_mixin_.child->get_result(dep_eov); - if (dep_eov.is_value()) { - try { - - eov = fix_void_caller::apply( - func_, std::move(dep_eov.get_value())); - } catch (const std::bad_alloc &) { - eov = make_error("Out of memory"); - } catch (const std::exception &) { - eov = make_error( - "Exception in chain occured. Return ErrorOr if you " - "want to handle errors which are recoverable"); - } - } else if (dep_eov.is_error()) { - eov = error_func_(std::move(dep_eov.get_error())); - } else { - eov = make_error("No value set in dependency"); - } - } else { - eov = make_error("Conveyor doesn't have child"); - } - } -}; - -class sink_conveyor_node final : public conveyor_node, - public conveyor_event_storage { -private: - conveyor_node_with_child_mixin child_mixin_; - conveyor_sink_set *conveyor_sink_; - -public: - sink_conveyor_node(own node, conveyor_sink_set &conv_sink) - : conveyor_event_storage{}, child_mixin_{std::move(node), *this}, - conveyor_sink_{&conv_sink} {} - - sink_conveyor_node(own node) - : conveyor_event_storage{}, child_mixin_{std::move(node), *this}, - conveyor_sink_{nullptr} {} - - // Event only queued if a critical error occured - void fire() override { - // Queued for destruction of children, because this acts as a sink and - // no other event should be here - child_mixin_.child = nullptr; - - if (conveyor_sink_) { - conveyor_sink_->destroy_sink_conveyor_node(*this); - conveyor_sink_ = nullptr; - } - } - - // ConveyorStorage - size_t space() const override { return 1; } - size_t queued() const override { return 0; } - - // ConveyorNode - void get_result(error_or_value &err_or_val) noexcept override { - err_or_val.as() = - make_error("In a sink node no result can be returned"); - } - - error_or> - swap_child(own &&swapee) noexcept override { - return child_mixin_.swap_child(std::move(swapee)); - } - - // ConveyorStorage - void child_has_fired() override { - if (child_mixin_.child) { - error_or dep_eov; - child_mixin_.child->get_result(dep_eov); - if (dep_eov.is_error()) { - if (dep_eov.get_error().is_critical()) { - if (!is_armed()) { - arm_last(); - } - } - if (conveyor_sink_) { - conveyor_sink_->fail(std::move(dep_eov.get_error())); - } - } - } - } - - /* - * No parent needs to be fired since we always have space - */ - void parent_has_fired() override {} - - conveyor_storage *next_storage() override { - // Should never happen though - assert(false); - return nullptr; - // return static_cast(this); - } -}; - -class immediate_conveyor_node_base : public conveyor_node, - public conveyor_event_storage { -private: -public: - immediate_conveyor_node_base(); - - virtual ~immediate_conveyor_node_base() = default; - - error_or> - swap_child(own &&swapee) noexcept override { - (void)swapee; - return make_error("Node doesn't support swapping"); - } - - conveyor_storage *next_storage() noexcept override { - return static_cast(this); - } -}; - -template -class immediate_conveyor_node final : public immediate_conveyor_node_base { -private: - error_or> value_; - uint8_t retrieved_; - -public: - immediate_conveyor_node(fix_void &&val); - immediate_conveyor_node(error &&error); - - // ConveyorStorage - size_t space() const override; - size_t queued() const override; - - void child_has_fired() override; - void parent_has_fired() override; - - // ConveyorNode - void get_result(error_or_value &err_or_val) noexcept override { - if (retrieved_ > 0) { - err_or_val.as>() = - make_error("Already taken value"); - } else { - err_or_val.as>() = std::move(value_); - } - if (queued() > 0) { - ++retrieved_; - } - } - - // Event - void fire() override; -}; - -/* - * Collects every incoming value and throws it in one lane - */ -class merge_conveyor_node_base : public conveyor_node, - public conveyor_event_storage { -public: - merge_conveyor_node_base(); - - virtual ~merge_conveyor_node_base() = default; - - conveyor_storage *next_storage() noexcept override { - return static_cast(this); - } -}; - -template -class merge_conveyor_node : public merge_conveyor_node_base { -private: - class appendage final : public conveyor_node, public conveyor_storage { - public: - own child; - merge_conveyor_node *merger; - - maybe>> error_or_value_; - - public: - appendage(own n, merge_conveyor_node &m) - : conveyor_storage{}, child{std::move(n)}, merger{&m}, - error_or_value_{std::nullopt} {} - - bool child_storage_has_element_queued() const { - if (!child) { - return false; - } - conveyor_storage *storage = child->next_storage(); - if (storage) { - return storage->queued() > 0; - } - return false; - } - - void get_appendage_result(error_or_value &eov); - - /** - * ConveyorNode - */ - error_or> - swap_child(own &&swapee_) override; - - conveyor_storage *next_storage() noexcept override { - return static_cast(this); - } - - void get_result(error_or_value &err_or_val) override; - - /** - * ConveyorStorage - */ - size_t space() const override; - - size_t queued() const override; - - void child_has_fired() override; - - void parent_has_fired() override; - - void set_parent(conveyor_storage *par) override; - }; - - friend class merge_conveyor_node_data; - friend class appendage; - - our> data_; - size_t next_appendage_ = 0; - -public: - merge_conveyor_node(our> data); - ~merge_conveyor_node(); - // ConveyorNode - error_or> - swap_child(own &&c) noexcept override; - - // Event - void get_result(error_or_value &err_or_val) noexcept override; - - void fire() override; - - // ConveyorStorage - size_t space() const override; - size_t queued() const override; - void child_has_fired() override; - void parent_has_fired() override; -}; - -template class merge_conveyor_node_data { -public: - std::vector::appendage>> appendages; - - merge_conveyor_node *merger = nullptr; - -public: - void attach(conveyor conv); - - void governing_node_destroyed(); -}; - -/* -class JoinConveyorNodeBase : public ConveyorNode, public ConveyorEventStorage { -private: - -public: -}; - -template -class JoinConveyorNode final : public JoinConveyorNodeBase { -private: - template - class Appendage : public ConveyorEventStorage { - private: - Maybe data = std::nullopt; - - public: - size_t space() const override; - size_t queued() const override; - - void fire() override; - void get_result(ErrorOrValue& eov) override; - }; - - std::tuple...> appendages; - -public: -}; - -*/ - -} // namespace saw - -#include "async.tmpl.h" diff --git a/modules/async/c++/async.hpp b/modules/async/c++/async.hpp new file mode 100644 index 0000000..92ada9c --- /dev/null +++ b/modules/async/c++/async.hpp @@ -0,0 +1,1023 @@ +#pragma once + +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace saw { +class conveyor_storage; +class conveyor_node { +public: + conveyor_node(); + virtual ~conveyor_node() = default; + + /** + * Internal method to retrieve results from children + */ + virtual void get_result(error_or_value &err_or_val) = 0; + + /** + * Swap out child with another one + */ + virtual error_or> + swap_child(own &&swapee_) = 0; + + /** + * Retrieve the next storage node + */ + virtual conveyor_storage *next_storage() = 0; + + /** + * Notify that a new parent was attached + * Only relevant for the feeding nodes + */ + virtual void notify_parent_attached(conveyor_node &){}; +}; + +class conveyor_node_with_child_mixin final { +public: + own child = nullptr; + + conveyor_node_with_child_mixin(own &&child_, + conveyor_node &owner_); + ~conveyor_node_with_child_mixin() = default; + + /** + * Swap out children and return the child ptr, since the caller is the child + * itself. Stack needs to be cleared before the child is destroyed, so the + * swapped out node is returned as well. + */ + error_or> swap_child(own &&swapee); +}; + +class conveyor_node_with_parent_mixin final { +public: + conveyor_node *parent = nullptr; + + error_or> + swap_child_of_parent(own &&swapee) { + SAW_ASSERT(parent) { + return make_error( + "Can't swap child, because parent doesn't exist"); + } + + return parent->swap_child(std::move(swapee)); + } + void change_parent(conveyor_node *p) { parent = p; } +}; + +class event_loop; +class wait_scope; +/* + * Event class similar to capn'proto. + * https://github.com/capnproto/capnproto + */ +class event { +private: + event_loop &loop_; + event **prev_ = nullptr; + event *next_ = nullptr; + + friend class event_loop; + +public: + event(); + event(event_loop &loop); + virtual ~event(); + + virtual void fire() = 0; + + void arm_next(); + void arm_later(); + void arm_last(); + void disarm(); + + bool is_armed() const; +}; + +class conveyor_storage { +protected: + conveyor_storage *parent_ = nullptr; + +public: + conveyor_storage(); + virtual ~conveyor_storage(); + + virtual size_t space() const = 0; + virtual size_t queued() const = 0; + virtual void child_has_fired() = 0; + virtual void parent_has_fired() = 0; + + virtual void set_parent(conveyor_storage *parent) = 0; + conveyor_storage *get_parent() const; +}; + +class conveyor_event_storage : public conveyor_storage, public event { +public: + conveyor_event_storage(); + virtual ~conveyor_event_storage() = default; + + void set_parent(conveyor_storage *parent) override; +}; + +class conveyor_base { +protected: + own node_; + +public: + conveyor_base(own &&node_p); + virtual ~conveyor_base() = default; + + conveyor_base(conveyor_base &&) = default; + conveyor_base &operator=(conveyor_base &&) = default; + + void get(error_or_value &err_or_val); +}; + +template class conveyor; + +template conveyor chained_conveyor_type(T *); + +// template Conveyor chainedConveyorType(Conveyor *); + +template T remove_error_or_type(T *); + +template T remove_error_or_type(error_or *); + +template +using remove_error_or = decltype(remove_error_or_type((T *)nullptr)); + +template +using chained_conveyors = decltype(chained_conveyor_type((T *)nullptr)); + +template +using conveyor_result = + chained_conveyors>>; + +struct propagate_error { +public: + error operator()(const error &err) const; + error operator()(error &&err); +}; + +class conveyor_sink { +private: + own node_; + +public: + conveyor_sink(); + conveyor_sink(own &&node); + + conveyor_sink(conveyor_sink &&) = default; + conveyor_sink &operator=(conveyor_sink &&) = default; +}; + +template class merge_conveyor_node_data; + +template class merge_conveyor { +private: + lent> data_; + +public: + merge_conveyor() = default; + merge_conveyor(lent> d); + ~merge_conveyor(); + + void attach(conveyor conv); +}; + +/** + * Main interface for async operations. + */ +template class conveyor final : public conveyor_base { +public: + /** + * Construct an immediately fulfilled node + */ + conveyor(fix_void value); + + /** + * Construct an immediately failed node + */ + conveyor(error &&err); + + /** + * Construct a conveyor with a child node + */ + conveyor(own node_p); + + conveyor(conveyor &&) = default; + conveyor &operator=(conveyor &&) = default; + + /** + * This method converts values or errors from children + */ + template + [[nodiscard]] conveyor_result + then(Func &&func, ErrorFunc &&error_func = propagate_error()); + + /** + * This method adds a buffer node in the conveyor chains which acts as a + * scheduler interrupt point and collects elements up to the supplied limit. + */ + [[nodiscard]] conveyor + buffer(size_t limit = std::numeric_limits::max()); + + /** + * This method just takes ownership of any supplied types, + * which are destroyed when the chain gets destroyed. + * Useful for resource lifetime control. + */ + template + [[nodiscard]] conveyor attach(Args &&...args); + + /** @todo implement + * This method limits the total amount of passed elements + * Be careful where you place this node into the chain. + * If you meant to fork it and destroy paths you shouldn't place + * an interrupt point between the fork and this limiter + */ + [[nodiscard]] conveyor limit(size_t val = 1); + + /** + * + */ + [[nodiscard]] std::pair, merge_conveyor> merge(); + + /** + * Moves the conveyor chain into a thread local storage point which drops + * every element. Use sink() if you want to control the lifetime of a + * conveyor chain + */ + template + void detach(ErrorFunc &&err_func = propagate_error()); + /** + * Creates a local sink which drops elements, but lifetime control remains + * in your hand. + */ + template + [[nodiscard]] conveyor_sink + sink(ErrorFunc &&error_func = propagate_error()); + + /** + * If no sink() or detach() is used you have to take elements out of the + * chain yourself. + */ + error_or> take(); + + /** @todo implement + * Specifically pump elements through this chain with the provided + * wait_scope + */ + void poll(wait_scope &wait_scope); + + // helper + static conveyor to_conveyor(own node); + + // helper + static own from_conveyor(conveyor conveyor); +}; + +template conveyor_result exec_later(Func &&func); + +/* + * Join Conveyors into a single one + */ +template +conveyor> +join_conveyors(std::tuple...> &conveyors); + +template class conveyor_feeder { +public: + virtual ~conveyor_feeder() = default; + + virtual void feed(T &&data) = 0; + virtual void fail(error &&error) = 0; + + virtual size_t space() const = 0; + virtual size_t queued() const = 0; + + virtual error swap(conveyor &&conveyor) noexcept = 0; +}; + +template <> class conveyor_feeder { +public: + virtual ~conveyor_feeder() = default; + + virtual void feed(void_t &&value = void_t{}) = 0; + virtual void fail(error &&error) = 0; + + virtual size_t space() const = 0; + virtual size_t queued() const = 0; + + virtual error swap(conveyor &&conveyor) noexcept = 0; +}; + +template struct conveyor_and_feeder { + own> feeder; + class conveyor conveyor; +}; + +template conveyor_and_feeder new_conveyor_and_feeder(); + +template conveyor_and_feeder one_time_conveyor_and_feeder(); + +enum class Signal : uint8_t { Terminate, User1 }; + +/** + * Class which acts as a correspondent between the running framework and outside + * events which may be signals from the operating system or just other threads. + * Default EventPorts are supplied by setupAsyncIo() in io.h + */ +class event_port { +public: + virtual ~event_port() = default; + + virtual conveyor on_signal(Signal signal) = 0; + + virtual void poll() = 0; + virtual void wait() = 0; + virtual void wait(const std::chrono::steady_clock::duration &) = 0; + virtual void wait(const std::chrono::steady_clock::time_point &) = 0; + + virtual void wake() = 0; +}; + +class sink_conveyor_node; + +class conveyor_sink_set final : public event { +private: + /* + class Helper final : public Event { + private: + void destroySinkConveyorNode(ConveyorNode& sink); + void fail(Error&& error); + + std::vector> sink_nodes; + std::queue delete_nodes; + std::function error_handler; + + public: + ConveyorSinks() = default; + ConveyorSinks(EventLoop& event_loop); + + void add(Conveyor node); + + void fire() override {} + }; + + gin::Own helper; + */ + friend class sink_conveyor_node; + + void destroy_sink_conveyor_node(conveyor_node &sink_node); + void fail(error &&err); + + std::list> sink_nodes_; + + std::queue delete_nodes_; + + std::function error_handler_; + +public: + // ConveyorSinks(); + // ConveyorSinks(EventLoop& event_loop); + conveyor_sink_set() = default; + conveyor_sink_set(event_loop &event_loop); + + void add(conveyor &&node); + + void fire() override; +}; + +/* + * EventLoop class similar to capn'proto. + * https://github.com/capnproto/capnproto + */ +class event_loop { +private: + friend class event; + event *head_ = nullptr; + event **tail_ = &head_; + event **next_insert_point_ = &head_; + event **later_insert_point_ = &head_; + + bool is_runnable_ = false; + + own event_port_ = nullptr; + + own daemon_sink_ = nullptr; + + // functions + void set_runnable(bool runnable); + + friend class wait_scope; + void enter_scope(); + void leave_scope(); + + bool turn_loop(); + bool turn(); + +public: + event_loop(); + event_loop(own &&port); + ~event_loop(); + + event_loop(event_loop &&) = default; + event_loop &operator=(event_loop &&) = default; + + bool wait(); + bool wait(const std::chrono::steady_clock::duration &); + bool wait(const std::chrono::steady_clock::time_point &); + bool poll(); + + event_port *get_event_port(); + + conveyor_sink_set &daemon(); +}; + +/* + * WaitScope class similar to capn'proto. + * https://github.com/capnproto/capnproto + */ +class wait_scope { +private: + event_loop &loop_; + +public: + wait_scope(event_loop &loop); + ~wait_scope(); + + void wait(); + void wait(const std::chrono::steady_clock::duration &); + void wait(const std::chrono::steady_clock::time_point &); + void poll(); +}; + +template conveyor_result yield_next(Func &&func); + +template conveyor_result yield_later(Func &&func); + +template conveyor_result yield_last(Func &&func); +} // namespace saw + +// Secret stuff +// Aka private semi hidden classes +namespace saw { + +template struct fix_void_caller { + template static Out apply(Func &func, In &&in) { + return func(std::move(in)); + } +}; + +template struct fix_void_caller { + template static Out apply(Func &func, void_t &&in) { + (void)in; + return func(); + } +}; + +template struct fix_void_caller { + template static void_t apply(Func &func, In &&in) { + func(std::move(in)); + return void_t{}; + } +}; + +template <> struct fix_void_caller { + template static void_t apply(Func &func, void_t &&in) { + (void)in; + func(); + return void_t{}; + } +}; + +template class adapt_conveyor_node; + +template +class adapt_conveyor_feeder final : public conveyor_feeder> { +private: + adapt_conveyor_node *feedee_ = nullptr; + +public: + ~adapt_conveyor_feeder(); + + void set_feedee(adapt_conveyor_node *feedee); + + void feed(T &&value) override; + void fail(error &&error) override; + + size_t space() const override; + size_t queued() const override; + + error swap(conveyor &&conv) noexcept override; +}; + +template +class adapt_conveyor_node final : public conveyor_node, + public conveyor_event_storage { +private: + adapt_conveyor_feeder *feeder_ = nullptr; + + std::queue>> storage_; + + conveyor_node_with_parent_mixin parent_node_; + +public: + adapt_conveyor_node(); + ~adapt_conveyor_node(); + + void set_feeder(adapt_conveyor_feeder *feeder); + + void feed(T &&value); + void fail(error &&error); + + // ConveyorNode + void get_result(error_or_value &err_or_val) override; + + error_or> + swap_child(own &&swapee) noexcept override; + + conveyor_storage *next_storage() noexcept override; + void notify_parent_attached(conveyor_node &) noexcept override; + + // ConveyorStorage + size_t space() const override; + size_t queued() const override; + + void child_has_fired() override; + void parent_has_fired() override; + + // Event + void fire() override; +}; + +template class one_time_conveyor_node; + +template +class one_time_conveyor_feeder final : public conveyor_feeder> { +private: + one_time_conveyor_node *feedee_ = nullptr; + +public: + ~one_time_conveyor_feeder(); + + void set_feedee(one_time_conveyor_node *feedee); + + void feed(T &&value) override; + void fail(error &&error) override; + + size_t space() const override; + size_t queued() const override; +}; + +template +class one_time_conveyor_node final : public conveyor_node, + public conveyor_storage, + public event { +private: + one_time_conveyor_feeder *feeder_ = nullptr; + + bool passed_ = false; + maybe> storage_ = std::nullopt; + +public: + ~one_time_conveyor_node(); + + void set_feeder(one_time_conveyor_feeder *feeder); + + void feed(T &&value); + void fail(error &&error); + + // ConveyorNode + void get_result(error_or_value &err_or_val) override; + + error_or> + swap_child(own &&swapee) override; + + // ConveyorStorage + size_t space() const override; + size_t queued() const override; + + void child_has_fired() override {} + void parent_has_fired() override; + + // Event + void fire() override; +}; + +/** + * This class buffers and saves incoming elements and acts as an interrupt node + * for processing calls + */ +class queue_buffer_conveyor_node_base : public conveyor_node, + public conveyor_event_storage { +protected: + conveyor_node_with_child_mixin child_mixin_; + +public: + queue_buffer_conveyor_node_base(own child_) + : conveyor_event_storage{}, child_mixin_{std::move(child_), *this} {} + virtual ~queue_buffer_conveyor_node_base() = default; + + /** + * Use mixin + */ + error_or> + swap_child(own &&swapee_) noexcept override; + + conveyor_storage *next_storage() noexcept override { + return static_cast(this); + } +}; + +template +class queue_buffer_conveyor_node final + : public queue_buffer_conveyor_node_base { +private: + std::queue> storage_; + size_t max_store_; + +public: + queue_buffer_conveyor_node(own dep, size_t max_size) + : queue_buffer_conveyor_node_base{std::move(dep)}, max_store_{ + max_size} {} + // Event + void fire() override; + // ConveyorNode + void get_result(error_or_value &eov) noexcept override; + + // ConveyorStorage + size_t space() const override; + size_t queued() const override; + + void child_has_fired() override; + void parent_has_fired() override; +}; + +class attach_conveyor_node_base : public conveyor_node { +protected: + conveyor_node_with_child_mixin child_mixin_; + +public: + attach_conveyor_node_base(own &&child_) + : child_mixin_{std::move(child_), *this} {} + + virtual ~attach_conveyor_node_base() = default; + + void get_result(error_or_value &err_or_val) noexcept override; + + /** + * Use mixin + */ + error_or> + swap_child(own &&swapee_) noexcept override; + + conveyor_storage *next_storage() noexcept override; +}; + +template +class attach_conveyor_node final : public attach_conveyor_node_base { +public: + attach_conveyor_node(own &&dep, Args &&...args) + : attach_conveyor_node_base(std::move(dep)), attached_data_{ + std::move(args...)} {} + +private: + std::tuple attached_data_; +}; + +class convert_conveyor_node_base : public conveyor_node { +public: + convert_conveyor_node_base(own &&dep); + virtual ~convert_conveyor_node_base() = default; + + void get_result(error_or_value &err_or_val) override; + + virtual void get_impl(error_or_value &err_or_val) = 0; + + error_or> + swap_child(own &&swapee) noexcept override; + + conveyor_storage *next_storage() noexcept override; + +protected: + conveyor_node_with_child_mixin child_mixin_; +}; + +template +class convert_conveyor_node final : public convert_conveyor_node_base { +private: + Func func_; + ErrorFunc error_func_; + + static_assert(std::is_same>::value, + "Should never be of type ErrorOr"); + +public: + convert_conveyor_node(own &&dep, Func &&func, + ErrorFunc &&error_func) + : convert_conveyor_node_base(std::move(dep)), func_{std::move(func)}, + error_func_{std::move(error_func)} {} + + void get_impl(error_or_value &err_or_val) noexcept override { + error_or> dep_eov; + error_or>> &eov = + err_or_val.as>>(); + if (child_mixin_.child) { + child_mixin_.child->get_result(dep_eov); + if (dep_eov.is_value()) { + try { + + eov = fix_void_caller::apply( + func_, std::move(dep_eov.get_value())); + } catch (const std::bad_alloc &) { + eov = make_error("Out of memory"); + } catch (const std::exception &) { + eov = make_error( + "Exception in chain occured. Return ErrorOr if you " + "want to handle errors which are recoverable"); + } + } else if (dep_eov.is_error()) { + eov = error_func_(std::move(dep_eov.get_error())); + } else { + eov = make_error("No value set in dependency"); + } + } else { + eov = make_error("Conveyor doesn't have child"); + } + } +}; + +class sink_conveyor_node final : public conveyor_node, + public conveyor_event_storage { +private: + conveyor_node_with_child_mixin child_mixin_; + conveyor_sink_set *conveyor_sink_; + +public: + sink_conveyor_node(own node, conveyor_sink_set &conv_sink) + : conveyor_event_storage{}, child_mixin_{std::move(node), *this}, + conveyor_sink_{&conv_sink} {} + + sink_conveyor_node(own node) + : conveyor_event_storage{}, child_mixin_{std::move(node), *this}, + conveyor_sink_{nullptr} {} + + // Event only queued if a critical error occured + void fire() override { + // Queued for destruction of children, because this acts as a sink and + // no other event should be here + child_mixin_.child = nullptr; + + if (conveyor_sink_) { + conveyor_sink_->destroy_sink_conveyor_node(*this); + conveyor_sink_ = nullptr; + } + } + + // ConveyorStorage + size_t space() const override { return 1; } + size_t queued() const override { return 0; } + + // ConveyorNode + void get_result(error_or_value &err_or_val) noexcept override { + err_or_val.as() = + make_error("In a sink node no result can be returned"); + } + + error_or> + swap_child(own &&swapee) noexcept override { + return child_mixin_.swap_child(std::move(swapee)); + } + + // ConveyorStorage + void child_has_fired() override { + if (child_mixin_.child) { + error_or dep_eov; + child_mixin_.child->get_result(dep_eov); + if (dep_eov.is_error()) { + if (dep_eov.get_error().is_critical()) { + if (!is_armed()) { + arm_last(); + } + } + if (conveyor_sink_) { + conveyor_sink_->fail(std::move(dep_eov.get_error())); + } + } + } + } + + /* + * No parent needs to be fired since we always have space + */ + void parent_has_fired() override {} + + conveyor_storage *next_storage() override { + // Should never happen though + assert(false); + return nullptr; + // return static_cast(this); + } +}; + +class immediate_conveyor_node_base : public conveyor_node, + public conveyor_event_storage { +private: +public: + immediate_conveyor_node_base(); + + virtual ~immediate_conveyor_node_base() = default; + + error_or> + swap_child(own &&swapee) noexcept override { + (void)swapee; + return make_error("Node doesn't support swapping"); + } + + conveyor_storage *next_storage() noexcept override { + return static_cast(this); + } +}; + +template +class immediate_conveyor_node final : public immediate_conveyor_node_base { +private: + error_or> value_; + uint8_t retrieved_; + +public: + immediate_conveyor_node(fix_void &&val); + immediate_conveyor_node(error &&error); + + // ConveyorStorage + size_t space() const override; + size_t queued() const override; + + void child_has_fired() override; + void parent_has_fired() override; + + // ConveyorNode + void get_result(error_or_value &err_or_val) noexcept override { + if (retrieved_ > 0) { + err_or_val.as>() = + make_error("Already taken value"); + } else { + err_or_val.as>() = std::move(value_); + } + if (queued() > 0) { + ++retrieved_; + } + } + + // Event + void fire() override; +}; + +/* + * Collects every incoming value and throws it in one lane + */ +class merge_conveyor_node_base : public conveyor_node, + public conveyor_event_storage { +public: + merge_conveyor_node_base(); + + virtual ~merge_conveyor_node_base() = default; + + conveyor_storage *next_storage() noexcept override { + return static_cast(this); + } +}; + +template +class merge_conveyor_node : public merge_conveyor_node_base { +private: + class appendage final : public conveyor_node, public conveyor_storage { + public: + own child; + merge_conveyor_node *merger; + + maybe>> error_or_value_; + + public: + appendage(own n, merge_conveyor_node &m) + : conveyor_storage{}, child{std::move(n)}, merger{&m}, + error_or_value_{std::nullopt} {} + + bool child_storage_has_element_queued() const { + if (!child) { + return false; + } + conveyor_storage *storage = child->next_storage(); + if (storage) { + return storage->queued() > 0; + } + return false; + } + + void get_appendage_result(error_or_value &eov); + + /** + * ConveyorNode + */ + error_or> + swap_child(own &&swapee_) override; + + conveyor_storage *next_storage() noexcept override { + return static_cast(this); + } + + void get_result(error_or_value &err_or_val) override; + + /** + * ConveyorStorage + */ + size_t space() const override; + + size_t queued() const override; + + void child_has_fired() override; + + void parent_has_fired() override; + + void set_parent(conveyor_storage *par) override; + }; + + friend class merge_conveyor_node_data; + friend class appendage; + + our> data_; + size_t next_appendage_ = 0; + +public: + merge_conveyor_node(our> data); + ~merge_conveyor_node(); + // ConveyorNode + error_or> + swap_child(own &&c) noexcept override; + + // Event + void get_result(error_or_value &err_or_val) noexcept override; + + void fire() override; + + // ConveyorStorage + size_t space() const override; + size_t queued() const override; + void child_has_fired() override; + void parent_has_fired() override; +}; + +template class merge_conveyor_node_data { +public: + std::vector::appendage>> appendages; + + merge_conveyor_node *merger = nullptr; + +public: + void attach(conveyor conv); + + void governing_node_destroyed(); +}; + +/* +class JoinConveyorNodeBase : public ConveyorNode, public ConveyorEventStorage { +private: + +public: +}; + +template +class JoinConveyorNode final : public JoinConveyorNodeBase { +private: + template + class Appendage : public ConveyorEventStorage { + private: + Maybe data = std::nullopt; + + public: + size_t space() const override; + size_t queued() const override; + + void fire() override; + void get_result(ErrorOrValue& eov) override; + }; + + std::tuple...> appendages; + +public: +}; + +*/ + +} // namespace saw + +#include "async.tmpl.hpp diff --git a/modules/async/c++/async.tmpl.h b/modules/async/c++/async.tmpl.h deleted file mode 100644 index 8fcb59c..0000000 --- a/modules/async/c++/async.tmpl.h +++ /dev/null @@ -1,767 +0,0 @@ -#pragma once - -#include -#include - -#include -// Template inlining - -namespace saw { - -template conveyor_result execLater(Func &&func) { - conveyor conveyor{fix_void{}}; - return conveyor.then(std::move(func)); -} - -template -conveyor::conveyor(fix_void value) : conveyor_base(nullptr) { - // Is there any way to do this? - // @todo new conveyor_base constructor for Immediate values - - own>> immediate = - heap>>(std::move(value)); - - if (!immediate) { - return; - } - - node_ = std::move(immediate); -} - -template -conveyor::conveyor(error &&err) : conveyor_base(nullptr) { - own>> immediate = - heap>>(std::move(err)); - - if (!immediate) { - return; - } - - node_ = std::move(immediate); -} - -template -conveyor::conveyor(own node_p) - : conveyor_base{std::move(node_p)} {} - -template -template -conveyor_result conveyor::then(Func &&func, - ErrorFunc &&error_func) { - own conversion_node = - heap>, fix_void, - Func, ErrorFunc>>( - std::move(node_), std::move(func), std::move(error_func)); - - return conveyor>>::to_conveyor( - std::move(conversion_node)); -} - -template conveyor conveyor::buffer(size_t size) { - SAW_ASSERT(node_) { return conveyor{own{nullptr}}; } - conveyor_storage *storage = node_->next_storage(); - SAW_ASSERT(storage) { return conveyor{own{nullptr}}; } - - own>> storage_node = - heap>>(std::move(node_), size); - - conveyor_storage *storage_ptr = - static_cast(storage_node.get()); - - storage->set_parent(storage_ptr); - return conveyor{std::move(storage_node)}; -} - -template -template -conveyor conveyor::attach(Args &&...args) { - own> attach_node = - heap>(std::move(node_), - std::move(args...)); - return conveyor{std::move(attach_node)}; -} - -template -std::pair, merge_conveyor> conveyor::merge() { - our> data = - share>(); - - own> merge_node = heap>(data); - - SAW_ASSERT(node_) { - return std::make_pair(conveyor{own{nullptr}}, - merge_conveyor{}); - } - conveyor_storage *storage = node_->next_storage(); - SAW_ASSERT(storage) { - return std::make_pair(conveyor{own{nullptr}}, - merge_conveyor{}); - } - - data->attach(conveyor::to_conveyor(std::move(node_))); - - merge_conveyor node_ref{data}; - - return std::make_pair(conveyor{std::move(merge_node)}, - std::move(node_ref)); -} - -template <> -template -conveyor_sink conveyor::sink(ErrorFunc &&error_func) { - conveyor_storage *storage = node_->next_storage(); - SAW_ASSERT(storage) { return conveyor_sink{}; } - - own sink_node = - heap(std::move(node_)); - conveyor_storage *storage_ptr = - static_cast(sink_node.get()); - - storage->set_parent(storage_ptr); - - return conveyor_sink{std::move(sink_node)}; -} - -void detach_conveyor(conveyor &&conveyor); - -template -template -void conveyor::detach(ErrorFunc &&func) { - detach_conveyor(std::move(then([](T &&) {}, std::move(func)))); -} - -template <> -template -void conveyor::detach(ErrorFunc &&func) { - detach_conveyor(std::move(then([]() {}, std::move(func)))); -} - -template -conveyor conveyor::to_conveyor(own node) { - return conveyor{std::move(node)}; -} - -template -own conveyor::from_conveyor(conveyor conveyor) { - return std::move(conveyor.node_); -} - -template error_or> conveyor::take() { - SAW_ASSERT(node_) { - return error_or>{ - make_error("conveyor in invalid state")}; - } - conveyor_storage *storage = node_->next_storage(); - if (storage) { - if (storage->queued() > 0) { - error_or> result; - node_->get_result(result); - return result; - } else { - return error_or>{ - make_error("conveyor buffer has no elements")}; - } - } else { - return error_or>{ - make_error("conveyor node has no child storage")}; - } -} - -template conveyor_and_feeder new_conveyor_and_feeder() { - own>> feeder = - heap>>(); - own>> node = - heap>>(); - - feeder->set_feedee(node.get()); - node->set_feeder(feeder.get()); - - return conveyor_and_feeder{std::move(feeder), - conveyor::to_conveyor(std::move(node))}; -} - -// QueueBuffer -template void queue_buffer_conveyor_node::fire() { - if (child_mixin_.child) { - if (!storage_.empty()) { - if (storage_.front().is_error()) { - if (storage_.front().get_error().is_critical()) { - child_mixin_.child = nullptr; - } - } - } - } - - bool has_space_before_fire = space() > 0; - - if (parent_) { - parent_->child_has_fired(); - if (!storage_.empty() && parent_->space() > 0) { - arm_later(); - } - } - - if (!child_mixin_.child) { - while (!storage_.empty()) { - storage_.pop(); - } - return; - } - - conveyor_storage *ch_storage = child_mixin_.child->next_storage(); - if (ch_storage && !has_space_before_fire) { - ch_storage->parent_has_fired(); - } -} - -template -void queue_buffer_conveyor_node::get_result(error_or_value &eov) noexcept { - error_or &err_or_val = eov.as(); - err_or_val = std::move(storage_.front()); - storage_.pop(); -} - -template size_t queue_buffer_conveyor_node::space() const { - return max_store_ - storage_.size(); -} - -template size_t queue_buffer_conveyor_node::queued() const { - return storage_.size(); -} - -template void queue_buffer_conveyor_node::child_has_fired() { - if (child_mixin_.child && storage_.size() < max_store_) { - error_or eov; - child_mixin_.child->get_result(eov); - - if (eov.is_error()) { - if (eov.get_error().is_critical()) { - } - } - - storage_.push(std::move(eov)); - if (!is_armed()) { - arm_later(); - } - } -} - -template void queue_buffer_conveyor_node::parent_has_fired() { - SAW_ASSERT(parent_) { return; } - - if (parent_->space() == 0) { - return; - } - - if (queued() > 0) { - arm_later(); - } -} - -template -immediate_conveyor_node::immediate_conveyor_node(fix_void &&val) - : value_{std::move(val)}, retrieved_{0} {} - -template -immediate_conveyor_node::immediate_conveyor_node(error &&error) - : value_{std::move(error)}, retrieved_{0} {} - -template size_t immediate_conveyor_node::space() const { - return 0; -} - -template size_t immediate_conveyor_node::queued() const { - return retrieved_ > 1 ? 0 : 1; -} - -template void immediate_conveyor_node::child_has_fired() { - // Impossible case - assert(false); -} - -template void immediate_conveyor_node::parent_has_fired() { - SAW_ASSERT(parent_) { return; } - assert(parent_->space() > 0); - - if (queued() > 0) { - arm_next(); - } -} - -template void immediate_conveyor_node::fire() { - - if (parent_) { - parent_->child_has_fired(); - if (queued() > 0 && parent_->space() > 0) { - arm_last(); - } - } -} - -template -merge_conveyor::merge_conveyor(lent> d) - : data_{std::move(d)} {} - -template merge_conveyor::~merge_conveyor() {} - -template void merge_conveyor::attach(conveyor conveyor) { - auto sp = data_.lock(); - SAW_ASSERT(sp) { return; } - - sp->attach(std::move(conveyor)); -} - -template -merge_conveyor_node::merge_conveyor_node(our> d) - : data_{d} { - SAW_ASSERT(data_) { return; } - - data_->merger = this; -} - -template merge_conveyor_node::~merge_conveyor_node() {} - -template -error_or> -merge_conveyor_node::swap_child(own &&swapee_) noexcept { - (void)swapee_; - return make_error( - "merge_conveyor_node::appendage should block calls to this class"); -} - -template -void merge_conveyor_node::get_result(error_or_value &eov) noexcept { - error_or> &err_or_val = eov.as>(); - - SAW_ASSERT(data_) { return; } - - /// @todo search appendages for result - - auto &appendages = data_->appendages; - next_appendage_ = std::min(appendages.size(), next_appendage_); - - for (size_t i = next_appendage_; i < appendages.size(); ++i) { - if (appendages[i]->queued() > 0) { - err_or_val = std::move(appendages[i]->error_or_value_.value()); - appendages[i]->error_or_value_ = std::nullopt; - next_appendage_ = i + 1; - return; - } - } - for (size_t i = 0; i < next_appendage_; ++i) { - if (appendages[i]->queued() > 0) { - err_or_val = std::move(appendages[i]->error_or_value_.value()); - appendages[i]->error_or_value_ = std::nullopt; - next_appendage_ = i + 1; - return; - } - } - - err_or_val = make_error("No value in Merge appendages"); -} - -template void merge_conveyor_node::fire() { - SAW_ASSERT(queued() > 0) { return; } - - if (parent_) { - parent_->child_has_fired(); - - if (queued() > 0 && parent_->space() > 0) { - arm_later(); - } - } -} - -template size_t merge_conveyor_node::space() const { return 0; } - -template size_t merge_conveyor_node::queued() const { - SAW_ASSERT(data_) { return 0; } - - size_t queue_count = 0; - - for (auto &iter : data_->appendages) { - queue_count += iter->queued(); - } - - return queue_count; -} - -template void merge_conveyor_node::child_has_fired() { - /// This can never happen - assert(false); -} - -template void merge_conveyor_node::parent_has_fired() { - SAW_ASSERT(parent_) { return; } - if (queued() > 0) { - if (parent_->space() > 0) { - arm_later(); - } - } -} - -/** - * merge_conveyor_node::Apendage - */ - -template -error_or> -merge_conveyor_node::appendage::swap_child(own &&swapee_) { - own old_child = std::move(child); - - child = std::move(swapee_); - - // This case should never happen - SAW_ASSERT(old_child) { return make_error("No child exists"); } - - return old_child; -} - -template -void merge_conveyor_node::appendage::get_result(error_or_value &eov) { - error_or> &err_or_val = eov.as>(); - - SAW_ASSERT(queued() > 0) { - err_or_val = - make_error("No element queued in Merge appendage Node"); - return; - } - - err_or_val = std::move(error_or_value_.value()); - error_or_value_ = std::nullopt; -} - -template size_t merge_conveyor_node::appendage::space() const { - SAW_ASSERT(merger) { return 0; } - - if (error_or_value_.has_value()) { - return 0; - } - - return 1; -} - -template size_t merge_conveyor_node::appendage::queued() const { - SAW_ASSERT(merger) { return 0; } - - if (error_or_value_.has_value()) { - return 1; - } - - return 0; -} - -/// @todo delete this function. Replaced by the regular get_result -template -void merge_conveyor_node::appendage::get_appendage_result( - error_or_value &eov) { - error_or> &err_or_val = eov.as>(); - - SAW_ASSERT(queued() > 0) { - err_or_val = - make_error("No element queued in Merge appendage Node"); - return; - } - - err_or_val = std::move(error_or_value_.value()); - error_or_value_ = std::nullopt; -} - -template -void merge_conveyor_node::appendage::child_has_fired() { - SAW_ASSERT(!error_or_value_.has_value()) { return; } - error_or> eov; - child->get_result(eov); - - error_or_value_ = std::move(eov); - - if (!merger->is_armed()) { - merger->arm_later(); - } -} - -template -void merge_conveyor_node::appendage::parent_has_fired() { - conveyor_storage *child_storage = child->next_storage(); - if (child_storage) { - child_storage->parent_has_fired(); - } -} - -template -void merge_conveyor_node::appendage::set_parent(conveyor_storage *par) { - SAW_ASSERT(merger) { return; } - - SAW_ASSERT(child) { return; } - - parent_ = par; -} - -template -void merge_conveyor_node_data::attach(conveyor conv) { - auto nas = conveyor::from_conveyor(std::move(conv)); - SAW_ASSERT(nas) { return; } - conveyor_storage *storage = nas->next_storage(); - SAW_ASSERT(storage) { return; } - - auto merge_node_appendage = - heap::appendage>(std::move(nas), - *merger); - auto merge_node_appendage_ptr = merge_node_appendage.get(); - - storage->set_parent(merge_node_appendage.get()); - - SAW_ASSERT(merger) { return; } - - conveyor_storage *mrg_storage = merger->next_storage(); - SAW_ASSERT(mrg_storage) { return; } - - merge_node_appendage->set_parent(mrg_storage); - - appendages.push_back(std::move(merge_node_appendage)); - - /// @todo return this. necessary? maybe for the weird linking setup - /// maybe not - // return merge_node_appendage_ptr; -} - -template -void merge_conveyor_node_data::governing_node_destroyed() { - appendages.clear(); - merger = nullptr; -} - -template adapt_conveyor_feeder::~adapt_conveyor_feeder() { - if (feedee_) { - feedee_->set_feeder(nullptr); - feedee_ = nullptr; - } -} - -template -void adapt_conveyor_feeder::set_feedee(adapt_conveyor_node *feedee_p) { - feedee_ = feedee_p; -} - -template void adapt_conveyor_feeder::feed(T &&value) { - if (feedee_) { - feedee_->feed(std::move(value)); - } -} - -template void adapt_conveyor_feeder::fail(error &&error) { - if (feedee_) { - feedee_->fail(std::move(error)); - } -} - -template size_t adapt_conveyor_feeder::queued() const { - if (feedee_) { - return feedee_->queued(); - } - return 0; -} - -template size_t adapt_conveyor_feeder::space() const { - if (feedee_) { - return feedee_->space(); - } - return 0; -} - -template -error adapt_conveyor_feeder::swap(conveyor &&conv) noexcept { - SAW_ASSERT(feedee_) { return make_error("No feedee connected"); } - - auto node = conveyor::from_conveyor(std::move(conv)); - - feedee_->swap_child(std::move(node)); - - return no_error(); -} - -template -adapt_conveyor_node::adapt_conveyor_node() : conveyor_event_storage{} {} - -template adapt_conveyor_node::~adapt_conveyor_node() { - if (feeder_) { - feeder_->set_feedee(nullptr); - feeder_ = nullptr; - } -} - -template -error_or> -adapt_conveyor_node::swap_child(own &&swapee) noexcept { - // This should return the owning pointer of this instance - auto myself_err = parent_node_.swap_child_of_parent(std::move(swapee)); - - if (myself_err.is_error()) { - return myself_err; - } - - auto &myself = myself_err.get_value(); - - assert(myself.get() == this); - - return myself_err; -} - -template -conveyor_storage *adapt_conveyor_node::next_storage() noexcept { - return static_cast(this); -} - -template -void adapt_conveyor_node::notify_parent_attached( - conveyor_node &par) noexcept { - parent_node_.change_parent(&par); -} - -template -void adapt_conveyor_node::set_feeder(adapt_conveyor_feeder *feeder_p) { - feeder_ = feeder_p; -} - -template void adapt_conveyor_node::feed(T &&value) { - storage_.push(std::move(value)); - arm_next(); -} - -template void adapt_conveyor_node::fail(error &&error) { - storage_.push(std::move(error)); - arm_next(); -} - -template size_t adapt_conveyor_node::queued() const { - return storage_.size(); -} - -template size_t adapt_conveyor_node::space() const { - return std::numeric_limits::max() - storage_.size(); -} - -template -void adapt_conveyor_node::get_result(error_or_value &err_or_val) { - if (!storage_.empty()) { - err_or_val.as() = std::move(storage_.front()); - storage_.pop(); - } else { - err_or_val.as() = make_error( - "Signal for retrieval of storage sent even though no " - "data is present"); - } -} - -template void adapt_conveyor_node::child_has_fired() { - // Adapt node has no children - assert(false); -} - -template void adapt_conveyor_node::parent_has_fired() { - SAW_ASSERT(parent_) { return; } - - if (parent_->space() == 0) { - return; - } -} - -template void adapt_conveyor_node::fire() { - if (parent_) { - parent_->child_has_fired(); - - if (storage_.size() > 0) { - arm_later(); - } - } -} - -template one_time_conveyor_feeder::~one_time_conveyor_feeder() { - if (feedee_) { - feedee_->set_feeder(nullptr); - feedee_ = nullptr; - } -} - -template -void one_time_conveyor_feeder::set_feedee( - one_time_conveyor_node *feedee_p) { - feedee_ = feedee_p; -} - -template void one_time_conveyor_feeder::feed(T &&value) { - if (feedee_) { - feedee_->feed(std::move(value)); - } -} - -template void one_time_conveyor_feeder::fail(error &&error) { - if (feedee_) { - feedee_->fail(std::move(error)); - } -} - -template size_t one_time_conveyor_feeder::queued() const { - if (feedee_) { - return feedee_->queued(); - } - return 0; -} - -template size_t one_time_conveyor_feeder::space() const { - if (feedee_) { - return feedee_->space(); - } - return 0; -} - -template one_time_conveyor_node::~one_time_conveyor_node() { - if (feeder_) { - feeder_->set_feedee(nullptr); - feeder_ = nullptr; - } -} - -template -void one_time_conveyor_node::set_feeder( - one_time_conveyor_feeder *feeder_p) { - feeder_ = feeder_p; -} - -template void one_time_conveyor_node::feed(T &&value) { - storage_ = std::move(value); - arm_next(); -} - -template void one_time_conveyor_node::fail(error &&error) { - storage_ = std::move(error); - arm_next(); -} - -template size_t one_time_conveyor_node::queued() const { - return storage_.has_value() ? 1 : 0; -} - -template size_t one_time_conveyor_node::space() const { - return passed_ ? 0 : 1; -} - -template -void one_time_conveyor_node::get_result(error_or_value &err_or_val) { - if (storage_.has_value()) { - err_or_val.as() = std::move(storage_.value()); - storage_ = std::nullopt; - } else { - err_or_val.as() = make_error( - "Signal for retrieval of storage sent even though no " - "data is present"); - } -} - -template void one_time_conveyor_node::fire() { - if (parent_) { - parent_->child_has_fired(); - } -} - -} // namespace saw diff --git a/modules/async/c++/async.tmpl.hpp b/modules/async/c++/async.tmpl.hpp new file mode 100644 index 0000000..8fcb59c --- /dev/null +++ b/modules/async/c++/async.tmpl.hpp @@ -0,0 +1,767 @@ +#pragma once + +#include +#include + +#include +// Template inlining + +namespace saw { + +template conveyor_result execLater(Func &&func) { + conveyor conveyor{fix_void{}}; + return conveyor.then(std::move(func)); +} + +template +conveyor::conveyor(fix_void value) : conveyor_base(nullptr) { + // Is there any way to do this? + // @todo new conveyor_base constructor for Immediate values + + own>> immediate = + heap>>(std::move(value)); + + if (!immediate) { + return; + } + + node_ = std::move(immediate); +} + +template +conveyor::conveyor(error &&err) : conveyor_base(nullptr) { + own>> immediate = + heap>>(std::move(err)); + + if (!immediate) { + return; + } + + node_ = std::move(immediate); +} + +template +conveyor::conveyor(own node_p) + : conveyor_base{std::move(node_p)} {} + +template +template +conveyor_result conveyor::then(Func &&func, + ErrorFunc &&error_func) { + own conversion_node = + heap>, fix_void, + Func, ErrorFunc>>( + std::move(node_), std::move(func), std::move(error_func)); + + return conveyor>>::to_conveyor( + std::move(conversion_node)); +} + +template conveyor conveyor::buffer(size_t size) { + SAW_ASSERT(node_) { return conveyor{own{nullptr}}; } + conveyor_storage *storage = node_->next_storage(); + SAW_ASSERT(storage) { return conveyor{own{nullptr}}; } + + own>> storage_node = + heap>>(std::move(node_), size); + + conveyor_storage *storage_ptr = + static_cast(storage_node.get()); + + storage->set_parent(storage_ptr); + return conveyor{std::move(storage_node)}; +} + +template +template +conveyor conveyor::attach(Args &&...args) { + own> attach_node = + heap>(std::move(node_), + std::move(args...)); + return conveyor{std::move(attach_node)}; +} + +template +std::pair, merge_conveyor> conveyor::merge() { + our> data = + share>(); + + own> merge_node = heap>(data); + + SAW_ASSERT(node_) { + return std::make_pair(conveyor{own{nullptr}}, + merge_conveyor{}); + } + conveyor_storage *storage = node_->next_storage(); + SAW_ASSERT(storage) { + return std::make_pair(conveyor{own{nullptr}}, + merge_conveyor{}); + } + + data->attach(conveyor::to_conveyor(std::move(node_))); + + merge_conveyor node_ref{data}; + + return std::make_pair(conveyor{std::move(merge_node)}, + std::move(node_ref)); +} + +template <> +template +conveyor_sink conveyor::sink(ErrorFunc &&error_func) { + conveyor_storage *storage = node_->next_storage(); + SAW_ASSERT(storage) { return conveyor_sink{}; } + + own sink_node = + heap(std::move(node_)); + conveyor_storage *storage_ptr = + static_cast(sink_node.get()); + + storage->set_parent(storage_ptr); + + return conveyor_sink{std::move(sink_node)}; +} + +void detach_conveyor(conveyor &&conveyor); + +template +template +void conveyor::detach(ErrorFunc &&func) { + detach_conveyor(std::move(then([](T &&) {}, std::move(func)))); +} + +template <> +template +void conveyor::detach(ErrorFunc &&func) { + detach_conveyor(std::move(then([]() {}, std::move(func)))); +} + +template +conveyor conveyor::to_conveyor(own node) { + return conveyor{std::move(node)}; +} + +template +own conveyor::from_conveyor(conveyor conveyor) { + return std::move(conveyor.node_); +} + +template error_or> conveyor::take() { + SAW_ASSERT(node_) { + return error_or>{ + make_error("conveyor in invalid state")}; + } + conveyor_storage *storage = node_->next_storage(); + if (storage) { + if (storage->queued() > 0) { + error_or> result; + node_->get_result(result); + return result; + } else { + return error_or>{ + make_error("conveyor buffer has no elements")}; + } + } else { + return error_or>{ + make_error("conveyor node has no child storage")}; + } +} + +template conveyor_and_feeder new_conveyor_and_feeder() { + own>> feeder = + heap>>(); + own>> node = + heap>>(); + + feeder->set_feedee(node.get()); + node->set_feeder(feeder.get()); + + return conveyor_and_feeder{std::move(feeder), + conveyor::to_conveyor(std::move(node))}; +} + +// QueueBuffer +template void queue_buffer_conveyor_node::fire() { + if (child_mixin_.child) { + if (!storage_.empty()) { + if (storage_.front().is_error()) { + if (storage_.front().get_error().is_critical()) { + child_mixin_.child = nullptr; + } + } + } + } + + bool has_space_before_fire = space() > 0; + + if (parent_) { + parent_->child_has_fired(); + if (!storage_.empty() && parent_->space() > 0) { + arm_later(); + } + } + + if (!child_mixin_.child) { + while (!storage_.empty()) { + storage_.pop(); + } + return; + } + + conveyor_storage *ch_storage = child_mixin_.child->next_storage(); + if (ch_storage && !has_space_before_fire) { + ch_storage->parent_has_fired(); + } +} + +template +void queue_buffer_conveyor_node::get_result(error_or_value &eov) noexcept { + error_or &err_or_val = eov.as(); + err_or_val = std::move(storage_.front()); + storage_.pop(); +} + +template size_t queue_buffer_conveyor_node::space() const { + return max_store_ - storage_.size(); +} + +template size_t queue_buffer_conveyor_node::queued() const { + return storage_.size(); +} + +template void queue_buffer_conveyor_node::child_has_fired() { + if (child_mixin_.child && storage_.size() < max_store_) { + error_or eov; + child_mixin_.child->get_result(eov); + + if (eov.is_error()) { + if (eov.get_error().is_critical()) { + } + } + + storage_.push(std::move(eov)); + if (!is_armed()) { + arm_later(); + } + } +} + +template void queue_buffer_conveyor_node::parent_has_fired() { + SAW_ASSERT(parent_) { return; } + + if (parent_->space() == 0) { + return; + } + + if (queued() > 0) { + arm_later(); + } +} + +template +immediate_conveyor_node::immediate_conveyor_node(fix_void &&val) + : value_{std::move(val)}, retrieved_{0} {} + +template +immediate_conveyor_node::immediate_conveyor_node(error &&error) + : value_{std::move(error)}, retrieved_{0} {} + +template size_t immediate_conveyor_node::space() const { + return 0; +} + +template size_t immediate_conveyor_node::queued() const { + return retrieved_ > 1 ? 0 : 1; +} + +template void immediate_conveyor_node::child_has_fired() { + // Impossible case + assert(false); +} + +template void immediate_conveyor_node::parent_has_fired() { + SAW_ASSERT(parent_) { return; } + assert(parent_->space() > 0); + + if (queued() > 0) { + arm_next(); + } +} + +template void immediate_conveyor_node::fire() { + + if (parent_) { + parent_->child_has_fired(); + if (queued() > 0 && parent_->space() > 0) { + arm_last(); + } + } +} + +template +merge_conveyor::merge_conveyor(lent> d) + : data_{std::move(d)} {} + +template merge_conveyor::~merge_conveyor() {} + +template void merge_conveyor::attach(conveyor conveyor) { + auto sp = data_.lock(); + SAW_ASSERT(sp) { return; } + + sp->attach(std::move(conveyor)); +} + +template +merge_conveyor_node::merge_conveyor_node(our> d) + : data_{d} { + SAW_ASSERT(data_) { return; } + + data_->merger = this; +} + +template merge_conveyor_node::~merge_conveyor_node() {} + +template +error_or> +merge_conveyor_node::swap_child(own &&swapee_) noexcept { + (void)swapee_; + return make_error( + "merge_conveyor_node::appendage should block calls to this class"); +} + +template +void merge_conveyor_node::get_result(error_or_value &eov) noexcept { + error_or> &err_or_val = eov.as>(); + + SAW_ASSERT(data_) { return; } + + /// @todo search appendages for result + + auto &appendages = data_->appendages; + next_appendage_ = std::min(appendages.size(), next_appendage_); + + for (size_t i = next_appendage_; i < appendages.size(); ++i) { + if (appendages[i]->queued() > 0) { + err_or_val = std::move(appendages[i]->error_or_value_.value()); + appendages[i]->error_or_value_ = std::nullopt; + next_appendage_ = i + 1; + return; + } + } + for (size_t i = 0; i < next_appendage_; ++i) { + if (appendages[i]->queued() > 0) { + err_or_val = std::move(appendages[i]->error_or_value_.value()); + appendages[i]->error_or_value_ = std::nullopt; + next_appendage_ = i + 1; + return; + } + } + + err_or_val = make_error("No value in Merge appendages"); +} + +template void merge_conveyor_node::fire() { + SAW_ASSERT(queued() > 0) { return; } + + if (parent_) { + parent_->child_has_fired(); + + if (queued() > 0 && parent_->space() > 0) { + arm_later(); + } + } +} + +template size_t merge_conveyor_node::space() const { return 0; } + +template size_t merge_conveyor_node::queued() const { + SAW_ASSERT(data_) { return 0; } + + size_t queue_count = 0; + + for (auto &iter : data_->appendages) { + queue_count += iter->queued(); + } + + return queue_count; +} + +template void merge_conveyor_node::child_has_fired() { + /// This can never happen + assert(false); +} + +template void merge_conveyor_node::parent_has_fired() { + SAW_ASSERT(parent_) { return; } + if (queued() > 0) { + if (parent_->space() > 0) { + arm_later(); + } + } +} + +/** + * merge_conveyor_node::Apendage + */ + +template +error_or> +merge_conveyor_node::appendage::swap_child(own &&swapee_) { + own old_child = std::move(child); + + child = std::move(swapee_); + + // This case should never happen + SAW_ASSERT(old_child) { return make_error("No child exists"); } + + return old_child; +} + +template +void merge_conveyor_node::appendage::get_result(error_or_value &eov) { + error_or> &err_or_val = eov.as>(); + + SAW_ASSERT(queued() > 0) { + err_or_val = + make_error("No element queued in Merge appendage Node"); + return; + } + + err_or_val = std::move(error_or_value_.value()); + error_or_value_ = std::nullopt; +} + +template size_t merge_conveyor_node::appendage::space() const { + SAW_ASSERT(merger) { return 0; } + + if (error_or_value_.has_value()) { + return 0; + } + + return 1; +} + +template size_t merge_conveyor_node::appendage::queued() const { + SAW_ASSERT(merger) { return 0; } + + if (error_or_value_.has_value()) { + return 1; + } + + return 0; +} + +/// @todo delete this function. Replaced by the regular get_result +template +void merge_conveyor_node::appendage::get_appendage_result( + error_or_value &eov) { + error_or> &err_or_val = eov.as>(); + + SAW_ASSERT(queued() > 0) { + err_or_val = + make_error("No element queued in Merge appendage Node"); + return; + } + + err_or_val = std::move(error_or_value_.value()); + error_or_value_ = std::nullopt; +} + +template +void merge_conveyor_node::appendage::child_has_fired() { + SAW_ASSERT(!error_or_value_.has_value()) { return; } + error_or> eov; + child->get_result(eov); + + error_or_value_ = std::move(eov); + + if (!merger->is_armed()) { + merger->arm_later(); + } +} + +template +void merge_conveyor_node::appendage::parent_has_fired() { + conveyor_storage *child_storage = child->next_storage(); + if (child_storage) { + child_storage->parent_has_fired(); + } +} + +template +void merge_conveyor_node::appendage::set_parent(conveyor_storage *par) { + SAW_ASSERT(merger) { return; } + + SAW_ASSERT(child) { return; } + + parent_ = par; +} + +template +void merge_conveyor_node_data::attach(conveyor conv) { + auto nas = conveyor::from_conveyor(std::move(conv)); + SAW_ASSERT(nas) { return; } + conveyor_storage *storage = nas->next_storage(); + SAW_ASSERT(storage) { return; } + + auto merge_node_appendage = + heap::appendage>(std::move(nas), + *merger); + auto merge_node_appendage_ptr = merge_node_appendage.get(); + + storage->set_parent(merge_node_appendage.get()); + + SAW_ASSERT(merger) { return; } + + conveyor_storage *mrg_storage = merger->next_storage(); + SAW_ASSERT(mrg_storage) { return; } + + merge_node_appendage->set_parent(mrg_storage); + + appendages.push_back(std::move(merge_node_appendage)); + + /// @todo return this. necessary? maybe for the weird linking setup + /// maybe not + // return merge_node_appendage_ptr; +} + +template +void merge_conveyor_node_data::governing_node_destroyed() { + appendages.clear(); + merger = nullptr; +} + +template adapt_conveyor_feeder::~adapt_conveyor_feeder() { + if (feedee_) { + feedee_->set_feeder(nullptr); + feedee_ = nullptr; + } +} + +template +void adapt_conveyor_feeder::set_feedee(adapt_conveyor_node *feedee_p) { + feedee_ = feedee_p; +} + +template void adapt_conveyor_feeder::feed(T &&value) { + if (feedee_) { + feedee_->feed(std::move(value)); + } +} + +template void adapt_conveyor_feeder::fail(error &&error) { + if (feedee_) { + feedee_->fail(std::move(error)); + } +} + +template size_t adapt_conveyor_feeder::queued() const { + if (feedee_) { + return feedee_->queued(); + } + return 0; +} + +template size_t adapt_conveyor_feeder::space() const { + if (feedee_) { + return feedee_->space(); + } + return 0; +} + +template +error adapt_conveyor_feeder::swap(conveyor &&conv) noexcept { + SAW_ASSERT(feedee_) { return make_error("No feedee connected"); } + + auto node = conveyor::from_conveyor(std::move(conv)); + + feedee_->swap_child(std::move(node)); + + return no_error(); +} + +template +adapt_conveyor_node::adapt_conveyor_node() : conveyor_event_storage{} {} + +template adapt_conveyor_node::~adapt_conveyor_node() { + if (feeder_) { + feeder_->set_feedee(nullptr); + feeder_ = nullptr; + } +} + +template +error_or> +adapt_conveyor_node::swap_child(own &&swapee) noexcept { + // This should return the owning pointer of this instance + auto myself_err = parent_node_.swap_child_of_parent(std::move(swapee)); + + if (myself_err.is_error()) { + return myself_err; + } + + auto &myself = myself_err.get_value(); + + assert(myself.get() == this); + + return myself_err; +} + +template +conveyor_storage *adapt_conveyor_node::next_storage() noexcept { + return static_cast(this); +} + +template +void adapt_conveyor_node::notify_parent_attached( + conveyor_node &par) noexcept { + parent_node_.change_parent(&par); +} + +template +void adapt_conveyor_node::set_feeder(adapt_conveyor_feeder *feeder_p) { + feeder_ = feeder_p; +} + +template void adapt_conveyor_node::feed(T &&value) { + storage_.push(std::move(value)); + arm_next(); +} + +template void adapt_conveyor_node::fail(error &&error) { + storage_.push(std::move(error)); + arm_next(); +} + +template size_t adapt_conveyor_node::queued() const { + return storage_.size(); +} + +template size_t adapt_conveyor_node::space() const { + return std::numeric_limits::max() - storage_.size(); +} + +template +void adapt_conveyor_node::get_result(error_or_value &err_or_val) { + if (!storage_.empty()) { + err_or_val.as() = std::move(storage_.front()); + storage_.pop(); + } else { + err_or_val.as() = make_error( + "Signal for retrieval of storage sent even though no " + "data is present"); + } +} + +template void adapt_conveyor_node::child_has_fired() { + // Adapt node has no children + assert(false); +} + +template void adapt_conveyor_node::parent_has_fired() { + SAW_ASSERT(parent_) { return; } + + if (parent_->space() == 0) { + return; + } +} + +template void adapt_conveyor_node::fire() { + if (parent_) { + parent_->child_has_fired(); + + if (storage_.size() > 0) { + arm_later(); + } + } +} + +template one_time_conveyor_feeder::~one_time_conveyor_feeder() { + if (feedee_) { + feedee_->set_feeder(nullptr); + feedee_ = nullptr; + } +} + +template +void one_time_conveyor_feeder::set_feedee( + one_time_conveyor_node *feedee_p) { + feedee_ = feedee_p; +} + +template void one_time_conveyor_feeder::feed(T &&value) { + if (feedee_) { + feedee_->feed(std::move(value)); + } +} + +template void one_time_conveyor_feeder::fail(error &&error) { + if (feedee_) { + feedee_->fail(std::move(error)); + } +} + +template size_t one_time_conveyor_feeder::queued() const { + if (feedee_) { + return feedee_->queued(); + } + return 0; +} + +template size_t one_time_conveyor_feeder::space() const { + if (feedee_) { + return feedee_->space(); + } + return 0; +} + +template one_time_conveyor_node::~one_time_conveyor_node() { + if (feeder_) { + feeder_->set_feedee(nullptr); + feeder_ = nullptr; + } +} + +template +void one_time_conveyor_node::set_feeder( + one_time_conveyor_feeder *feeder_p) { + feeder_ = feeder_p; +} + +template void one_time_conveyor_node::feed(T &&value) { + storage_ = std::move(value); + arm_next(); +} + +template void one_time_conveyor_node::fail(error &&error) { + storage_ = std::move(error); + arm_next(); +} + +template size_t one_time_conveyor_node::queued() const { + return storage_.has_value() ? 1 : 0; +} + +template size_t one_time_conveyor_node::space() const { + return passed_ ? 0 : 1; +} + +template +void one_time_conveyor_node::get_result(error_or_value &err_or_val) { + if (storage_.has_value()) { + err_or_val.as() = std::move(storage_.value()); + storage_ = std::nullopt; + } else { + err_or_val.as() = make_error( + "Signal for retrieval of storage sent even though no " + "data is present"); + } +} + +template void one_time_conveyor_node::fire() { + if (parent_) { + parent_->child_has_fired(); + } +} + +} // namespace saw -- cgit v1.2.3