diff options
Diffstat (limited to 'src/async/async.h')
-rw-r--r-- | src/async/async.h | 1023 |
1 files changed, 1023 insertions, 0 deletions
diff --git a/src/async/async.h b/src/async/async.h new file mode 100644 index 0000000..4cfed60 --- /dev/null +++ b/src/async/async.h @@ -0,0 +1,1023 @@ +#pragma once + +#include <forstio/core/common.h> +#include <forstio/core/error.h> + +#include <chrono> +#include <functional> +#include <limits> +#include <list> +#include <queue> +#include <type_traits> + +namespace saw { +class conveyor_storage; +class conveyor_node { +public: + conveyor_node(); + virtual ~conveyor_node() = default; + + /** + * Internal method to retrieve results from children + */ + virtual void get_result(error_or_value &err_or_val) = 0; + + /** + * Swap out child with another one + */ + virtual error_or<own<conveyor_node>> + swap_child(own<conveyor_node> &&swapee_) = 0; + + /** + * Retrieve the next storage node + */ + virtual conveyor_storage *next_storage() = 0; + + /** + * Notify that a new parent was attached + * Only relevant for the feeding nodes + */ + virtual void notify_parent_attached(conveyor_node &){}; +}; + +class conveyor_node_with_child_mixin final { +public: + own<conveyor_node> child = nullptr; + + conveyor_node_with_child_mixin(own<conveyor_node> &&child_, + conveyor_node &owner_); + ~conveyor_node_with_child_mixin() = default; + + /** + * Swap out children and return the child ptr, since the caller is the child + * itself. Stack needs to be cleared before the child is destroyed, so the + * swapped out node is returned as well. + */ + error_or<own<conveyor_node>> swap_child(own<conveyor_node> &&swapee); +}; + +class conveyor_node_with_parent_mixin final { +public: + conveyor_node *parent = nullptr; + + error_or<own<conveyor_node>> + swap_child_of_parent(own<conveyor_node> &&swapee) { + SAW_ASSERT(parent) { + return make_error<err::invalid_state>( + "Can't swap child, because parent doesn't exist"); + } + + return parent->swap_child(std::move(swapee)); + } + void change_parent(conveyor_node *p) { parent = p; } +}; + +class event_loop; +class wait_scope; +/* + * Event class similar to capn'proto. + * https://github.com/capnproto/capnproto + */ +class event { +private: + event_loop &loop_; + event **prev_ = nullptr; + event *next_ = nullptr; + + friend class event_loop; + +public: + event(); + event(event_loop &loop); + virtual ~event(); + + virtual void fire() = 0; + + void arm_next(); + void arm_later(); + void arm_last(); + void disarm(); + + bool is_armed() const; +}; + +class conveyor_storage { +protected: + conveyor_storage *parent_ = nullptr; + +public: + conveyor_storage(); + virtual ~conveyor_storage(); + + virtual size_t space() const = 0; + virtual size_t queued() const = 0; + virtual void child_has_fired() = 0; + virtual void parent_has_fired() = 0; + + virtual void set_parent(conveyor_storage *parent) = 0; + conveyor_storage *get_parent() const; +}; + +class conveyor_event_storage : public conveyor_storage, public event { +public: + conveyor_event_storage(); + virtual ~conveyor_event_storage() = default; + + void set_parent(conveyor_storage *parent) override; +}; + +class conveyor_base { +protected: + own<conveyor_node> node_; + +public: + conveyor_base(own<conveyor_node> &&node_p); + virtual ~conveyor_base() = default; + + conveyor_base(conveyor_base &&) = default; + conveyor_base &operator=(conveyor_base &&) = default; + + void get(error_or_value &err_or_val); +}; + +template <typename T> class conveyor; + +template <typename T> conveyor<T> chained_conveyor_type(T *); + +// template <typename T> Conveyor<T> chainedConveyorType(Conveyor<T> *); + +template <typename T> T remove_error_or_type(T *); + +template <typename T> T remove_error_or_type(error_or<T> *); + +template <typename T> +using remove_error_or = decltype(remove_error_or_type((T *)nullptr)); + +template <typename T> +using chained_conveyors = decltype(chained_conveyor_type((T *)nullptr)); + +template <typename Func, typename T> +using conveyor_result = + chained_conveyors<remove_error_or<return_type<Func, T>>>; + +struct propagate_error { +public: + error operator()(const error &err) const; + error operator()(error &&err); +}; + +class conveyor_sink { +private: + own<conveyor_node> node_; + +public: + conveyor_sink(); + conveyor_sink(own<conveyor_node> &&node); + + conveyor_sink(conveyor_sink &&) = default; + conveyor_sink &operator=(conveyor_sink &&) = default; +}; + +template <typename T> class merge_conveyor_node_data; + +template <typename T> class merge_conveyor { +private: + lent<merge_conveyor_node_data<T>> data_; + +public: + merge_conveyor() = default; + merge_conveyor(lent<merge_conveyor_node_data<T>> d); + ~merge_conveyor(); + + void attach(conveyor<T> conv); +}; + +/** + * Main interface for async operations. + */ +template <typename T> class conveyor final : public conveyor_base { +public: + /** + * Construct an immediately fulfilled node + */ + conveyor(fix_void<T> value); + + /** + * Construct an immediately failed node + */ + conveyor(error &&err); + + /** + * Construct a conveyor with a child node + */ + conveyor(own<conveyor_node> node_p); + + conveyor(conveyor<T> &&) = default; + conveyor<T> &operator=(conveyor<T> &&) = default; + + /** + * This method converts values or errors from children + */ + template <typename Func, typename ErrorFunc = propagate_error> + [[nodiscard]] conveyor_result<Func, T> + then(Func &&func, ErrorFunc &&error_func = propagate_error()); + + /** + * This method adds a buffer node in the conveyor chains which acts as a + * scheduler interrupt point and collects elements up to the supplied limit. + */ + [[nodiscard]] conveyor<T> + buffer(size_t limit = std::numeric_limits<size_t>::max()); + + /** + * This method just takes ownership of any supplied types, + * which are destroyed when the chain gets destroyed. + * Useful for resource lifetime control. + */ + template <typename... Args> + [[nodiscard]] conveyor<T> attach(Args &&...args); + + /** @todo implement + * This method limits the total amount of passed elements + * Be careful where you place this node into the chain. + * If you meant to fork it and destroy paths you shouldn't place + * an interrupt point between the fork and this limiter + */ + [[nodiscard]] conveyor<T> limit(size_t val = 1); + + /** + * + */ + [[nodiscard]] std::pair<conveyor<T>, merge_conveyor<T>> merge(); + + /** + * Moves the conveyor chain into a thread local storage point which drops + * every element. Use sink() if you want to control the lifetime of a + * conveyor chain + */ + template <typename ErrorFunc = propagate_error> + void detach(ErrorFunc &&err_func = propagate_error()); + /** + * Creates a local sink which drops elements, but lifetime control remains + * in your hand. + */ + template <typename ErrorFunc = propagate_error> + [[nodiscard]] conveyor_sink + sink(ErrorFunc &&error_func = propagate_error()); + + /** + * If no sink() or detach() is used you have to take elements out of the + * chain yourself. + */ + error_or<fix_void<T>> take(); + + /** @todo implement + * Specifically pump elements through this chain with the provided + * wait_scope + */ + void poll(wait_scope &wait_scope); + + // helper + static conveyor<T> to_conveyor(own<conveyor_node> node); + + // helper + static own<conveyor_node> from_conveyor(conveyor<T> conveyor); +}; + +template <typename Func> conveyor_result<Func, void> exec_later(Func &&func); + +/* + * Join Conveyors into a single one + */ +template <typename... Args> +conveyor<std::tuple<Args...>> +join_conveyors(std::tuple<conveyor<Args>...> &conveyors); + +template <typename T> class conveyor_feeder { +public: + virtual ~conveyor_feeder() = default; + + virtual void feed(T &&data) = 0; + virtual void fail(error &&error) = 0; + + virtual size_t space() const = 0; + virtual size_t queued() const = 0; + + virtual error swap(conveyor<T> &&conveyor) noexcept = 0; +}; + +template <> class conveyor_feeder<void> { +public: + virtual ~conveyor_feeder() = default; + + virtual void feed(void_t &&value = void_t{}) = 0; + virtual void fail(error &&error) = 0; + + virtual size_t space() const = 0; + virtual size_t queued() const = 0; + + virtual error swap(conveyor<void_t> &&conveyor) noexcept = 0; +}; + +template <typename T> struct conveyor_and_feeder { + own<conveyor_feeder<T>> feeder; + conveyor<T> conveyor; +}; + +template <typename T> conveyor_and_feeder<T> new_conveyor_and_feeder(); + +template <typename T> conveyor_and_feeder<T> one_time_conveyor_and_feeder(); + +enum class Signal : uint8_t { Terminate, User1 }; + +/** + * Class which acts as a correspondent between the running framework and outside + * events which may be signals from the operating system or just other threads. + * Default EventPorts are supplied by setupAsyncIo() in io.h + */ +class event_port { +public: + virtual ~event_port() = default; + + virtual conveyor<void> on_signal(Signal signal) = 0; + + virtual void poll() = 0; + virtual void wait() = 0; + virtual void wait(const std::chrono::steady_clock::duration &) = 0; + virtual void wait(const std::chrono::steady_clock::time_point &) = 0; + + virtual void wake() = 0; +}; + +class sink_conveyor_node; + +class conveyor_sink_set final : public event { +private: + /* + class Helper final : public Event { + private: + void destroySinkConveyorNode(ConveyorNode& sink); + void fail(Error&& error); + + std::vector<Own<ConveyorNode>> sink_nodes; + std::queue<ConveyorNode*> delete_nodes; + std::function<void(Error&& error)> error_handler; + + public: + ConveyorSinks() = default; + ConveyorSinks(EventLoop& event_loop); + + void add(Conveyor<void> node); + + void fire() override {} + }; + + gin::Own<Helper> helper; + */ + friend class sink_conveyor_node; + + void destroy_sink_conveyor_node(conveyor_node &sink_node); + void fail(error &&err); + + std::list<own<conveyor_node>> sink_nodes_; + + std::queue<conveyor_node *> delete_nodes_; + + std::function<void(error &&)> error_handler_; + +public: + // ConveyorSinks(); + // ConveyorSinks(EventLoop& event_loop); + conveyor_sink_set() = default; + conveyor_sink_set(event_loop &event_loop); + + void add(conveyor<void> &&node); + + void fire() override; +}; + +/* + * EventLoop class similar to capn'proto. + * https://github.com/capnproto/capnproto + */ +class event_loop { +private: + friend class event; + event *head_ = nullptr; + event **tail_ = &head_; + event **next_insert_point_ = &head_; + event **later_insert_point_ = &head_; + + bool is_runnable_ = false; + + own<event_port> event_port_ = nullptr; + + own<conveyor_sink_set> daemon_sink_ = nullptr; + + // functions + void set_runnable(bool runnable); + + friend class wait_scope; + void enter_scope(); + void leave_scope(); + + bool turn_loop(); + bool turn(); + +public: + event_loop(); + event_loop(own<event_port> &&port); + ~event_loop(); + + event_loop(event_loop &&) = default; + event_loop &operator=(event_loop &&) = default; + + bool wait(); + bool wait(const std::chrono::steady_clock::duration &); + bool wait(const std::chrono::steady_clock::time_point &); + bool poll(); + + event_port *event_port(); + + conveyor_sink_set &daemon(); +}; + +/* + * WaitScope class similar to capn'proto. + * https://github.com/capnproto/capnproto + */ +class wait_scope { +private: + event_loop &loop_; + +public: + wait_scope(event_loop &loop); + ~wait_scope(); + + void wait(); + void wait(const std::chrono::steady_clock::duration &); + void wait(const std::chrono::steady_clock::time_point &); + void poll(); +}; + +template <typename Func> conveyor_result<Func, void> yield_next(Func &&func); + +template <typename Func> conveyor_result<Func, void> yield_later(Func &&func); + +template <typename Func> conveyor_result<Func, void> yield_last(Func &&func); +} // namespace saw + +// Secret stuff +// Aka private semi hidden classes +namespace saw { + +template <typename Out, typename In> struct fix_void_caller { + template <typename Func> static Out apply(Func &func, In &&in) { + return func(std::move(in)); + } +}; + +template <typename Out> struct fix_void_caller<Out, void_t> { + template <typename Func> static Out apply(Func &func, void_t &&in) { + (void)in; + return func(); + } +}; + +template <typename In> struct fix_void_caller<void_t, In> { + template <typename Func> static void_t apply(Func &func, In &&in) { + func(std::move(in)); + return void_t{}; + } +}; + +template <> struct fix_void_caller<void_t, void_t> { + template <typename Func> static void_t apply(Func &func, void_t &&in) { + (void)in; + func(); + return void_t{}; + } +}; + +template <typename T> class adapt_conveyor_node; + +template <typename T> +class adapt_conveyor_feeder final : public conveyor_feeder<unfix_void<T>> { +private: + adapt_conveyor_node<T> *feedee_ = nullptr; + +public: + ~adapt_conveyor_feeder(); + + void set_feedee(adapt_conveyor_node<T> *feedee); + + void feed(T &&value) override; + void fail(error &&error) override; + + size_t space() const override; + size_t queued() const override; + + error swap(conveyor<T> &&conv) noexcept override; +}; + +template <typename T> +class adapt_conveyor_node final : public conveyor_node, + public conveyor_event_storage { +private: + adapt_conveyor_feeder<T> *feeder_ = nullptr; + + std::queue<error_or<unfix_void<T>>> storage_; + + conveyor_node_with_parent_mixin parent_node_; + +public: + adapt_conveyor_node(); + ~adapt_conveyor_node(); + + void set_feeder(adapt_conveyor_feeder<T> *feeder); + + void feed(T &&value); + void fail(error &&error); + + // ConveyorNode + void get_result(error_or_value &err_or_val) override; + + error_or<own<conveyor_node>> + swap_child(own<conveyor_node> &&swapee) noexcept override; + + conveyor_storage *next_storage() noexcept override; + void notify_parent_attached(conveyor_node &) noexcept override; + + // ConveyorStorage + size_t space() const override; + size_t queued() const override; + + void child_has_fired() override; + void parent_has_fired() override; + + // Event + void fire() override; +}; + +template <typename T> class one_time_conveyor_node; + +template <typename T> +class one_time_conveyor_feeder final : public conveyor_feeder<unfix_void<T>> { +private: + one_time_conveyor_node<T> *feedee_ = nullptr; + +public: + ~one_time_conveyor_feeder(); + + void set_feedee(one_time_conveyor_node<T> *feedee); + + void feed(T &&value) override; + void fail(error &&error) override; + + size_t space() const override; + size_t queued() const override; +}; + +template <typename T> +class one_time_conveyor_node final : public conveyor_node, + public conveyor_storage, + public event { +private: + one_time_conveyor_feeder<T> *feeder_ = nullptr; + + bool passed_ = false; + maybe<error_or<T>> storage_ = std::nullopt; + +public: + ~one_time_conveyor_node(); + + void set_feeder(one_time_conveyor_feeder<T> *feeder); + + void feed(T &&value); + void fail(error &&error); + + // ConveyorNode + void get_result(error_or_value &err_or_val) override; + + error_or<own<conveyor_node>> + swap_child(own<conveyor_node> &&swapee) override; + + // ConveyorStorage + size_t space() const override; + size_t queued() const override; + + void child_has_fired() override {} + void parent_has_fired() override; + + // Event + void fire() override; +}; + +/** + * This class buffers and saves incoming elements and acts as an interrupt node + * for processing calls + */ +class queue_buffer_conveyor_node_base : public conveyor_node, + public conveyor_event_storage { +protected: + conveyor_node_with_child_mixin child_mixin_; + +public: + queue_buffer_conveyor_node_base(own<conveyor_node> child_) + : conveyor_event_storage{}, child_mixin_{std::move(child_), *this} {} + virtual ~queue_buffer_conveyor_node_base() = default; + + /** + * Use mixin + */ + error_or<own<conveyor_node>> + swap_child(own<conveyor_node> &&swapee_) noexcept override; + + conveyor_storage *next_storage() noexcept override { + return static_cast<conveyor_storage *>(this); + } +}; + +template <typename T> +class queue_buffer_conveyor_node final + : public queue_buffer_conveyor_node_base { +private: + std::queue<error_or<T>> storage_; + size_t max_store_; + +public: + queue_buffer_conveyor_node(own<conveyor_node> dep, size_t max_size) + : queue_buffer_conveyor_node_base{std::move(dep)}, max_store_{ + max_size} {} + // Event + void fire() override; + // ConveyorNode + void get_result(error_or_value &eov) noexcept override; + + // ConveyorStorage + size_t space() const override; + size_t queued() const override; + + void child_has_fired() override; + void parent_has_fired() override; +}; + +class attach_conveyor_node_base : public conveyor_node { +protected: + conveyor_node_with_child_mixin child_mixin_; + +public: + attach_conveyor_node_base(own<conveyor_node> &&child_) + : child_mixin_{std::move(child_), *this} {} + + virtual ~attach_conveyor_node_base() = default; + + void get_result(error_or_value &err_or_val) noexcept override; + + /** + * Use mixin + */ + error_or<own<conveyor_node>> + swap_child(own<conveyor_node> &&swapee_) noexcept override; + + conveyor_storage *next_storage() noexcept override; +}; + +template <typename... Args> +class attach_conveyor_node final : public attach_conveyor_node_base { +public: + attach_conveyor_node(own<conveyor_node> &&dep, Args &&...args) + : attach_conveyor_node_base(std::move(dep)), attached_data_{ + std::move(args...)} {} + +private: + std::tuple<Args...> attached_data_; +}; + +class convert_conveyor_node_base : public conveyor_node { +public: + convert_conveyor_node_base(own<conveyor_node> &&dep); + virtual ~convert_conveyor_node_base() = default; + + void get_result(error_or_value &err_or_val) override; + + virtual void get_impl(error_or_value &err_or_val) = 0; + + error_or<own<conveyor_node>> + swap_child(own<conveyor_node> &&swapee) noexcept override; + + conveyor_storage *next_storage() noexcept override; + +protected: + conveyor_node_with_child_mixin child_mixin_; +}; + +template <typename T, typename DepT, typename Func, typename ErrorFunc> +class convert_conveyor_node final : public convert_conveyor_node_base { +private: + Func func_; + ErrorFunc error_func_; + + static_assert(std::is_same<DepT, remove_error_or<DepT>>::value, + "Should never be of type ErrorOr"); + +public: + convert_conveyor_node(own<conveyor_node> &&dep, Func &&func, + ErrorFunc &&error_func) + : convert_conveyor_node_base(std::move(dep)), func_{std::move(func)}, + error_func_{std::move(error_func)} {} + + void get_impl(error_or_value &err_or_val) noexcept override { + error_or<unfix_void<DepT>> dep_eov; + error_or<unfix_void<remove_error_or<T>>> &eov = + err_or_val.as<unfix_void<remove_error_or<T>>>(); + if (child_mixin_.child) { + child_mixin_.child->get_result(dep_eov); + if (dep_eov.is_value()) { + try { + + eov = fix_void_caller<T, DepT>::apply( + func_, std::move(dep_eov.get_value())); + } catch (const std::bad_alloc &) { + eov = make_error<err::out_of_memory>("Out of memory"); + } catch (const std::exception &) { + eov = make_error<err::invalid_state>( + "Exception in chain occured. Return ErrorOr<T> if you " + "want to handle errors which are recoverable"); + } + } else if (dep_eov.is_error()) { + eov = error_func_(std::move(dep_eov.get_error())); + } else { + eov = make_error<err::invalid_state>("No value set in dependency"); + } + } else { + eov = make_error<err::invalid_state>("Conveyor doesn't have child"); + } + } +}; + +class sink_conveyor_node final : public conveyor_node, + public conveyor_event_storage { +private: + conveyor_node_with_child_mixin child_mixin_; + conveyor_sink_set *conveyor_sink_; + +public: + sink_conveyor_node(own<conveyor_node> node, conveyor_sink_set &conv_sink) + : conveyor_event_storage{}, child_mixin_{std::move(node), *this}, + conveyor_sink_{&conv_sink} {} + + sink_conveyor_node(own<conveyor_node> node) + : conveyor_event_storage{}, child_mixin_{std::move(node), *this}, + conveyor_sink_{nullptr} {} + + // Event only queued if a critical error occured + void fire() override { + // Queued for destruction of children, because this acts as a sink and + // no other event should be here + child_mixin_.child = nullptr; + + if (conveyor_sink_) { + conveyor_sink_->destroy_sink_conveyor_node(*this); + conveyor_sink_ = nullptr; + } + } + + // ConveyorStorage + size_t space() const override { return 1; } + size_t queued() const override { return 0; } + + // ConveyorNode + void get_result(error_or_value &err_or_val) noexcept override { + err_or_val.as<void_t>() = + make_error<err::invalid_state>("In a sink node no result can be returned"); + } + + error_or<own<conveyor_node>> + swap_child(own<conveyor_node> &&swapee) noexcept override { + return child_mixin_.swap_child(std::move(swapee)); + } + + // ConveyorStorage + void child_has_fired() override { + if (child_mixin_.child) { + error_or<void> dep_eov; + child_mixin_.child->get_result(dep_eov); + if (dep_eov.is_error()) { + if (dep_eov.get_error().is_critical()) { + if (!is_armed()) { + arm_last(); + } + } + if (conveyor_sink_) { + conveyor_sink_->fail(std::move(dep_eov.get_error())); + } + } + } + } + + /* + * No parent needs to be fired since we always have space + */ + void parent_has_fired() override {} + + conveyor_storage *next_storage() override { + // Should never happen though + assert(false); + return nullptr; + // return static_cast<ConveyorStorage*>(this); + } +}; + +class immediate_conveyor_node_base : public conveyor_node, + public conveyor_event_storage { +private: +public: + immediate_conveyor_node_base(); + + virtual ~immediate_conveyor_node_base() = default; + + error_or<own<conveyor_node>> + swap_child(own<conveyor_node> &&swapee) noexcept override { + (void)swapee; + return make_error<err::not_supported>("Node doesn't support swapping"); + } + + conveyor_storage *next_storage() noexcept override { + return static_cast<conveyor_storage *>(this); + } +}; + +template <typename T> +class immediate_conveyor_node final : public immediate_conveyor_node_base { +private: + error_or<fix_void<T>> value_; + uint8_t retrieved_; + +public: + immediate_conveyor_node(fix_void<T> &&val); + immediate_conveyor_node(error &&error); + + // ConveyorStorage + size_t space() const override; + size_t queued() const override; + + void child_has_fired() override; + void parent_has_fired() override; + + // ConveyorNode + void get_result(error_or_value &err_or_val) noexcept override { + if (retrieved_ > 0) { + err_or_val.as<fix_void<T>>() = + make_error<err::buffer_exhausted>("Already taken value"); + } else { + err_or_val.as<fix_void<T>>() = std::move(value_); + } + if (queued() > 0) { + ++retrieved_; + } + } + + // Event + void fire() override; +}; + +/* + * Collects every incoming value and throws it in one lane + */ +class merge_conveyor_node_base : public conveyor_node, + public conveyor_event_storage { +public: + merge_conveyor_node_base(); + + virtual ~merge_conveyor_node_base() = default; + + conveyor_storage *next_storage() noexcept override { + return static_cast<conveyor_storage *>(this); + } +}; + +template <typename T> +class merge_conveyor_node : public merge_conveyor_node_base { +private: + class appendage final : public conveyor_node, public conveyor_storage { + public: + own<conveyor_node> child; + merge_conveyor_node *merger; + + maybe<error_or<fix_void<T>>> error_or_value_; + + public: + appendage(own<conveyor_node> n, merge_conveyor_node &m) + : conveyor_storage{}, child{std::move(n)}, merger{&m}, + error_or_value_{std::nullopt} {} + + bool child_storage_has_element_queued() const { + if (!child) { + return false; + } + conveyor_storage *storage = child->next_storage(); + if (storage) { + return storage->queued() > 0; + } + return false; + } + + void get_appendage_result(error_or_value &eov); + + /** + * ConveyorNode + */ + error_or<own<conveyor_node>> + swap_child(own<conveyor_node> &&swapee_) override; + + conveyor_storage *next_storage() noexcept override { + return static_cast<conveyor_storage *>(this); + } + + void get_result(error_or_value &err_or_val) override; + + /** + * ConveyorStorage + */ + size_t space() const override; + + size_t queued() const override; + + void child_has_fired() override; + + void parent_has_fired() override; + + void set_parent(conveyor_storage *par) override; + }; + + friend class merge_conveyor_node_data<T>; + friend class appendage; + + our<merge_conveyor_node_data<T>> data_; + size_t next_appendage_ = 0; + +public: + merge_conveyor_node(our<merge_conveyor_node_data<T>> data); + ~merge_conveyor_node(); + // ConveyorNode + error_or<own<conveyor_node>> + swap_child(own<conveyor_node> &&c) noexcept override; + + // Event + void get_result(error_or_value &err_or_val) noexcept override; + + void fire() override; + + // ConveyorStorage + size_t space() const override; + size_t queued() const override; + void child_has_fired() override; + void parent_has_fired() override; +}; + +template <typename T> class merge_conveyor_node_data { +public: + std::vector<own<typename merge_conveyor_node<T>::appendage>> appendages; + + merge_conveyor_node<T> *merger = nullptr; + +public: + void attach(conveyor<T> conv); + + void governing_node_destroyed(); +}; + +/* +class JoinConveyorNodeBase : public ConveyorNode, public ConveyorEventStorage { +private: + +public: +}; + +template <typename... Args> +class JoinConveyorNode final : public JoinConveyorNodeBase { +private: + template<typename T> + class Appendage : public ConveyorEventStorage { + private: + Maybe<T> data = std::nullopt; + + public: + size_t space() const override; + size_t queued() const override; + + void fire() override; + void get_result(ErrorOrValue& eov) override; + }; + + std::tuple<Appendage<Args>...> appendages; + +public: +}; + +*/ + +} // namespace saw + +#include "async.tmpl.h" |