summaryrefslogtreecommitdiff
path: root/forstio/async/async.h
diff options
context:
space:
mode:
Diffstat (limited to 'forstio/async/async.h')
-rw-r--r--forstio/async/async.h1023
1 files changed, 1023 insertions, 0 deletions
diff --git a/forstio/async/async.h b/forstio/async/async.h
new file mode 100644
index 0000000..4e4f230
--- /dev/null
+++ b/forstio/async/async.h
@@ -0,0 +1,1023 @@
+#pragma once
+
+#include <forstio/common.h>
+#include <forstio/error.h>
+
+#include <chrono>
+#include <functional>
+#include <limits>
+#include <list>
+#include <queue>
+#include <type_traits>
+
+namespace saw {
+class conveyor_storage;
+class conveyor_node {
+public:
+ conveyor_node();
+ virtual ~conveyor_node() = default;
+
+ /**
+ * Internal method to retrieve results from children
+ */
+ virtual void get_result(error_or_value &err_or_val) = 0;
+
+ /**
+ * Swap out child with another one
+ */
+ virtual error_or<own<conveyor_node>>
+ swap_child(own<conveyor_node> &&swapee_) = 0;
+
+ /**
+ * Retrieve the next storage node
+ */
+ virtual conveyor_storage *next_storage() = 0;
+
+ /**
+ * Notify that a new parent was attached
+ * Only relevant for the feeding nodes
+ */
+ virtual void notify_parent_attached(conveyor_node &){};
+};
+
+class conveyor_node_with_child_mixin final {
+public:
+ own<conveyor_node> child = nullptr;
+
+ conveyor_node_with_child_mixin(own<conveyor_node> &&child_,
+ conveyor_node &owner_);
+ ~conveyor_node_with_child_mixin() = default;
+
+ /**
+ * Swap out children and return the child ptr, since the caller is the child
+ * itself. Stack needs to be cleared before the child is destroyed, so the
+ * swapped out node is returned as well.
+ */
+ error_or<own<conveyor_node>> swap_child(own<conveyor_node> &&swapee);
+};
+
+class conveyor_node_with_parent_mixin final {
+public:
+ conveyor_node *parent = nullptr;
+
+ error_or<own<conveyor_node>>
+ swap_child_of_parent(own<conveyor_node> &&swapee) {
+ SAW_ASSERT(parent) {
+ return critical_error(
+ "Can't swap child, because parent doesn't exist");
+ }
+
+ return parent->swap_child(std::move(swapee));
+ }
+ void change_parent(conveyor_node *p) { parent = p; }
+};
+
+class event_loop;
+class wait_scope;
+/*
+ * Event class similar to capn'proto.
+ * https://github.com/capnproto/capnproto
+ */
+class event {
+private:
+ event_loop &loop_;
+ event **prev_ = nullptr;
+ event *next_ = nullptr;
+
+ friend class event_loop;
+
+public:
+ event();
+ event(event_loop &loop);
+ virtual ~event();
+
+ virtual void fire() = 0;
+
+ void arm_next();
+ void arm_later();
+ void arm_last();
+ void disarm();
+
+ bool is_armed() const;
+};
+
+class conveyor_storage {
+protected:
+ conveyor_storage *parent_ = nullptr;
+
+public:
+ conveyor_storage();
+ virtual ~conveyor_storage();
+
+ virtual size_t space() const = 0;
+ virtual size_t queued() const = 0;
+ virtual void child_has_fired() = 0;
+ virtual void parent_has_fired() = 0;
+
+ virtual void set_parent(conveyor_storage *parent) = 0;
+ conveyor_storage *get_parent() const;
+};
+
+class conveyor_event_storage : public conveyor_storage, public event {
+public:
+ conveyor_event_storage();
+ virtual ~conveyor_event_storage() = default;
+
+ void set_parent(conveyor_storage *parent) override;
+};
+
+class conveyor_base {
+protected:
+ own<conveyor_node> node_;
+
+public:
+ conveyor_base(own<conveyor_node> &&node_p);
+ virtual ~conveyor_base() = default;
+
+ conveyor_base(conveyor_base &&) = default;
+ conveyor_base &operator=(conveyor_base &&) = default;
+
+ void get(error_or_value &err_or_val);
+};
+
+template <typename T> class conveyor;
+
+template <typename T> conveyor<T> chained_conveyor_type(T *);
+
+// template <typename T> Conveyor<T> chainedConveyorType(Conveyor<T> *);
+
+template <typename T> T remove_error_or_type(T *);
+
+template <typename T> T remove_error_or_type(error_or<T> *);
+
+template <typename T>
+using remove_error_or = decltype(remove_error_or_type((T *)nullptr));
+
+template <typename T>
+using chained_conveyors = decltype(chained_conveyor_type((T *)nullptr));
+
+template <typename Func, typename T>
+using conveyor_result =
+ chained_conveyors<remove_error_or<return_type<Func, T>>>;
+
+struct propagate_error {
+public:
+ error operator()(const error &err) const;
+ error operator()(error &&err);
+};
+
+class conveyor_sink {
+private:
+ own<conveyor_node> node_;
+
+public:
+ conveyor_sink();
+ conveyor_sink(own<conveyor_node> &&node);
+
+ conveyor_sink(conveyor_sink &&) = default;
+ conveyor_sink &operator=(conveyor_sink &&) = default;
+};
+
+template <typename T> class merge_conveyor_node_data;
+
+template <typename T> class merge_conveyor {
+private:
+ lent<merge_conveyor_node_data<T>> data_;
+
+public:
+ merge_conveyor() = default;
+ merge_conveyor(lent<merge_conveyor_node_data<T>> d);
+ ~merge_conveyor();
+
+ void attach(conveyor<T> conv);
+};
+
+/**
+ * Main interface for async operations.
+ */
+template <typename T> class conveyor final : public conveyor_base {
+public:
+ /**
+ * Construct an immediately fulfilled node
+ */
+ conveyor(fix_void<T> value);
+
+ /**
+ * Construct an immediately failed node
+ */
+ conveyor(error &&err);
+
+ /**
+ * Construct a conveyor with a child node
+ */
+ conveyor(own<conveyor_node> node_p);
+
+ conveyor(conveyor<T> &&) = default;
+ conveyor<T> &operator=(conveyor<T> &&) = default;
+
+ /**
+ * This method converts values or errors from children
+ */
+ template <typename Func, typename ErrorFunc = propagate_error>
+ [[nodiscard]] conveyor_result<Func, T>
+ then(Func &&func, ErrorFunc &&error_func = propagate_error());
+
+ /**
+ * This method adds a buffer node in the conveyor chains which acts as a
+ * scheduler interrupt point and collects elements up to the supplied limit.
+ */
+ [[nodiscard]] conveyor<T>
+ buffer(size_t limit = std::numeric_limits<size_t>::max());
+
+ /**
+ * This method just takes ownership of any supplied types,
+ * which are destroyed when the chain gets destroyed.
+ * Useful for resource lifetime control.
+ */
+ template <typename... Args>
+ [[nodiscard]] conveyor<T> attach(Args &&...args);
+
+ /** @todo implement
+ * This method limits the total amount of passed elements
+ * Be careful where you place this node into the chain.
+ * If you meant to fork it and destroy paths you shouldn't place
+ * an interrupt point between the fork and this limiter
+ */
+ [[nodiscard]] conveyor<T> limit(size_t val = 1);
+
+ /**
+ *
+ */
+ [[nodiscard]] std::pair<conveyor<T>, merge_conveyor<T>> merge();
+
+ /**
+ * Moves the conveyor chain into a thread local storage point which drops
+ * every element. Use sink() if you want to control the lifetime of a
+ * conveyor chain
+ */
+ template <typename ErrorFunc = propagate_error>
+ void detach(ErrorFunc &&err_func = propagate_error());
+ /**
+ * Creates a local sink which drops elements, but lifetime control remains
+ * in your hand.
+ */
+ template <typename ErrorFunc = propagate_error>
+ [[nodiscard]] conveyor_sink
+ sink(ErrorFunc &&error_func = propagate_error());
+
+ /**
+ * If no sink() or detach() is used you have to take elements out of the
+ * chain yourself.
+ */
+ error_or<fix_void<T>> take();
+
+ /** @todo implement
+ * Specifically pump elements through this chain with the provided
+ * wait_scope
+ */
+ void poll(wait_scope &wait_scope);
+
+ // helper
+ static conveyor<T> to_conveyor(own<conveyor_node> node);
+
+ // helper
+ static own<conveyor_node> from_conveyor(conveyor<T> conveyor);
+};
+
+template <typename Func> conveyor_result<Func, void> exec_later(Func &&func);
+
+/*
+ * Join Conveyors into a single one
+ */
+template <typename... Args>
+conveyor<std::tuple<Args...>>
+join_conveyors(std::tuple<conveyor<Args>...> &conveyors);
+
+template <typename T> class conveyor_feeder {
+public:
+ virtual ~conveyor_feeder() = default;
+
+ virtual void feed(T &&data) = 0;
+ virtual void fail(error &&error) = 0;
+
+ virtual size_t space() const = 0;
+ virtual size_t queued() const = 0;
+
+ virtual error swap(conveyor<T> &&conveyor) noexcept = 0;
+};
+
+template <> class conveyor_feeder<void> {
+public:
+ virtual ~conveyor_feeder() = default;
+
+ virtual void feed(void_t &&value = void_t{}) = 0;
+ virtual void fail(error &&error) = 0;
+
+ virtual size_t space() const = 0;
+ virtual size_t queued() const = 0;
+
+ virtual error swap(conveyor<void_t> &&conveyor) noexcept = 0;
+};
+
+template <typename T> struct conveyor_and_feeder {
+ own<conveyor_feeder<T>> feeder;
+ conveyor<T> conveyor;
+};
+
+template <typename T> conveyor_and_feeder<T> new_conveyor_and_feeder();
+
+template <typename T> conveyor_and_feeder<T> one_time_conveyor_and_feeder();
+
+enum class Signal : uint8_t { Terminate, User1 };
+
+/**
+ * Class which acts as a correspondent between the running framework and outside
+ * events which may be signals from the operating system or just other threads.
+ * Default EventPorts are supplied by setupAsyncIo() in io.h
+ */
+class event_port {
+public:
+ virtual ~event_port() = default;
+
+ virtual conveyor<void> on_signal(Signal signal) = 0;
+
+ virtual void poll() = 0;
+ virtual void wait() = 0;
+ virtual void wait(const std::chrono::steady_clock::duration &) = 0;
+ virtual void wait(const std::chrono::steady_clock::time_point &) = 0;
+
+ virtual void wake() = 0;
+};
+
+class sink_conveyor_node;
+
+class conveyor_sink_set final : public event {
+private:
+ /*
+ class Helper final : public Event {
+ private:
+ void destroySinkConveyorNode(ConveyorNode& sink);
+ void fail(Error&& error);
+
+ std::vector<Own<ConveyorNode>> sink_nodes;
+ std::queue<ConveyorNode*> delete_nodes;
+ std::function<void(Error&& error)> error_handler;
+
+ public:
+ ConveyorSinks() = default;
+ ConveyorSinks(EventLoop& event_loop);
+
+ void add(Conveyor<void> node);
+
+ void fire() override {}
+ };
+
+ gin::Own<Helper> helper;
+ */
+ friend class sink_conveyor_node;
+
+ void destroy_sink_conveyor_node(conveyor_node &sink_node);
+ void fail(error &&err);
+
+ std::list<own<conveyor_node>> sink_nodes_;
+
+ std::queue<conveyor_node *> delete_nodes_;
+
+ std::function<void(error &&)> error_handler_;
+
+public:
+ // ConveyorSinks();
+ // ConveyorSinks(EventLoop& event_loop);
+ conveyor_sink_set() = default;
+ conveyor_sink_set(event_loop &event_loop);
+
+ void add(conveyor<void> &&node);
+
+ void fire() override;
+};
+
+/*
+ * EventLoop class similar to capn'proto.
+ * https://github.com/capnproto/capnproto
+ */
+class event_loop {
+private:
+ friend class event;
+ event *head_ = nullptr;
+ event **tail_ = &head_;
+ event **next_insert_point_ = &head_;
+ event **later_insert_point_ = &head_;
+
+ bool is_runnable_ = false;
+
+ own<event_port> event_port_ = nullptr;
+
+ own<conveyor_sink_set> daemon_sink_ = nullptr;
+
+ // functions
+ void set_runnable(bool runnable);
+
+ friend class wait_scope;
+ void enter_scope();
+ void leave_scope();
+
+ bool turn_loop();
+ bool turn();
+
+public:
+ event_loop();
+ event_loop(own<event_port> &&port);
+ ~event_loop();
+
+ event_loop(event_loop &&) = default;
+ event_loop &operator=(event_loop &&) = default;
+
+ bool wait();
+ bool wait(const std::chrono::steady_clock::duration &);
+ bool wait(const std::chrono::steady_clock::time_point &);
+ bool poll();
+
+ event_port *event_port();
+
+ conveyor_sink_set &daemon();
+};
+
+/*
+ * WaitScope class similar to capn'proto.
+ * https://github.com/capnproto/capnproto
+ */
+class wait_scope {
+private:
+ event_loop &loop_;
+
+public:
+ wait_scope(event_loop &loop);
+ ~wait_scope();
+
+ void wait();
+ void wait(const std::chrono::steady_clock::duration &);
+ void wait(const std::chrono::steady_clock::time_point &);
+ void poll();
+};
+
+template <typename Func> conveyor_result<Func, void> yield_next(Func &&func);
+
+template <typename Func> conveyor_result<Func, void> yield_later(Func &&func);
+
+template <typename Func> conveyor_result<Func, void> yield_last(Func &&func);
+} // namespace saw
+
+// Secret stuff
+// Aka private semi hidden classes
+namespace saw {
+
+template <typename Out, typename In> struct fix_void_caller {
+ template <typename Func> static Out apply(Func &func, In &&in) {
+ return func(std::move(in));
+ }
+};
+
+template <typename Out> struct fix_void_caller<Out, void_t> {
+ template <typename Func> static Out apply(Func &func, void_t &&in) {
+ (void)in;
+ return func();
+ }
+};
+
+template <typename In> struct fix_void_caller<void_t, In> {
+ template <typename Func> static void_t apply(Func &func, In &&in) {
+ func(std::move(in));
+ return void_t{};
+ }
+};
+
+template <> struct fix_void_caller<void_t, void_t> {
+ template <typename Func> static void_t apply(Func &func, void_t &&in) {
+ (void)in;
+ func();
+ return void_t{};
+ }
+};
+
+template <typename T> class adapt_conveyor_node;
+
+template <typename T>
+class adapt_conveyor_feeder final : public conveyor_feeder<unfix_void<T>> {
+private:
+ adapt_conveyor_node<T> *feedee_ = nullptr;
+
+public:
+ ~adapt_conveyor_feeder();
+
+ void set_feedee(adapt_conveyor_node<T> *feedee);
+
+ void feed(T &&value) override;
+ void fail(error &&error) override;
+
+ size_t space() const override;
+ size_t queued() const override;
+
+ error swap(conveyor<T> &&conv) noexcept override;
+};
+
+template <typename T>
+class adapt_conveyor_node final : public conveyor_node,
+ public conveyor_event_storage {
+private:
+ adapt_conveyor_feeder<T> *feeder_ = nullptr;
+
+ std::queue<error_or<unfix_void<T>>> storage_;
+
+ conveyor_node_with_parent_mixin parent_node_;
+
+public:
+ adapt_conveyor_node();
+ ~adapt_conveyor_node();
+
+ void set_feeder(adapt_conveyor_feeder<T> *feeder);
+
+ void feed(T &&value);
+ void fail(error &&error);
+
+ // ConveyorNode
+ void get_result(error_or_value &err_or_val) override;
+
+ error_or<own<conveyor_node>>
+ swap_child(own<conveyor_node> &&swapee) noexcept override;
+
+ conveyor_storage *next_storage() noexcept override;
+ void notify_parent_attached(conveyor_node &) noexcept override;
+
+ // ConveyorStorage
+ size_t space() const override;
+ size_t queued() const override;
+
+ void child_has_fired() override;
+ void parent_has_fired() override;
+
+ // Event
+ void fire() override;
+};
+
+template <typename T> class one_time_conveyor_node;
+
+template <typename T>
+class one_time_conveyor_feeder final : public conveyor_feeder<unfix_void<T>> {
+private:
+ one_time_conveyor_node<T> *feedee_ = nullptr;
+
+public:
+ ~one_time_conveyor_feeder();
+
+ void set_feedee(one_time_conveyor_node<T> *feedee);
+
+ void feed(T &&value) override;
+ void fail(error &&error) override;
+
+ size_t space() const override;
+ size_t queued() const override;
+};
+
+template <typename T>
+class one_time_conveyor_node final : public conveyor_node,
+ public conveyor_storage,
+ public event {
+private:
+ one_time_conveyor_feeder<T> *feeder_ = nullptr;
+
+ bool passed_ = false;
+ maybe<error_or<T>> storage_ = std::nullopt;
+
+public:
+ ~one_time_conveyor_node();
+
+ void set_feeder(one_time_conveyor_feeder<T> *feeder);
+
+ void feed(T &&value);
+ void fail(error &&error);
+
+ // ConveyorNode
+ void get_result(error_or_value &err_or_val) override;
+
+ error_or<own<conveyor_node>>
+ swap_child(own<conveyor_node> &&swapee) override;
+
+ // ConveyorStorage
+ size_t space() const override;
+ size_t queued() const override;
+
+ void child_has_fired() override {}
+ void parent_has_fired() override;
+
+ // Event
+ void fire() override;
+};
+
+/**
+ * This class buffers and saves incoming elements and acts as an interrupt node
+ * for processing calls
+ */
+class queue_buffer_conveyor_node_base : public conveyor_node,
+ public conveyor_event_storage {
+protected:
+ conveyor_node_with_child_mixin child_mixin_;
+
+public:
+ queue_buffer_conveyor_node_base(own<conveyor_node> child_)
+ : conveyor_event_storage{}, child_mixin_{std::move(child_), *this} {}
+ virtual ~queue_buffer_conveyor_node_base() = default;
+
+ /**
+ * Use mixin
+ */
+ error_or<own<conveyor_node>>
+ swap_child(own<conveyor_node> &&swapee_) noexcept override;
+
+ conveyor_storage *next_storage() noexcept override {
+ return static_cast<conveyor_storage *>(this);
+ }
+};
+
+template <typename T>
+class queue_buffer_conveyor_node final
+ : public queue_buffer_conveyor_node_base {
+private:
+ std::queue<error_or<T>> storage_;
+ size_t max_store_;
+
+public:
+ queue_buffer_conveyor_node(own<conveyor_node> dep, size_t max_size)
+ : queue_buffer_conveyor_node_base{std::move(dep)}, max_store_{
+ max_size} {}
+ // Event
+ void fire() override;
+ // ConveyorNode
+ void get_result(error_or_value &eov) noexcept override;
+
+ // ConveyorStorage
+ size_t space() const override;
+ size_t queued() const override;
+
+ void child_has_fired() override;
+ void parent_has_fired() override;
+};
+
+class attach_conveyor_node_base : public conveyor_node {
+protected:
+ conveyor_node_with_child_mixin child_mixin_;
+
+public:
+ attach_conveyor_node_base(own<conveyor_node> &&child_)
+ : child_mixin_{std::move(child_), *this} {}
+
+ virtual ~attach_conveyor_node_base() = default;
+
+ void get_result(error_or_value &err_or_val) noexcept override;
+
+ /**
+ * Use mixin
+ */
+ error_or<own<conveyor_node>>
+ swap_child(own<conveyor_node> &&swapee_) noexcept override;
+
+ conveyor_storage *next_storage() noexcept override;
+};
+
+template <typename... Args>
+class attach_conveyor_node final : public attach_conveyor_node_base {
+public:
+ attach_conveyor_node(own<conveyor_node> &&dep, Args &&...args)
+ : attach_conveyor_node_base(std::move(dep)), attached_data_{
+ std::move(args...)} {}
+
+private:
+ std::tuple<Args...> attached_data_;
+};
+
+class convert_conveyor_node_base : public conveyor_node {
+public:
+ convert_conveyor_node_base(own<conveyor_node> &&dep);
+ virtual ~convert_conveyor_node_base() = default;
+
+ void get_result(error_or_value &err_or_val) override;
+
+ virtual void get_impl(error_or_value &err_or_val) = 0;
+
+ error_or<own<conveyor_node>>
+ swap_child(own<conveyor_node> &&swapee) noexcept override;
+
+ conveyor_storage *next_storage() noexcept override;
+
+protected:
+ conveyor_node_with_child_mixin child_mixin_;
+};
+
+template <typename T, typename DepT, typename Func, typename ErrorFunc>
+class convert_conveyor_node final : public convert_conveyor_node_base {
+private:
+ Func func_;
+ ErrorFunc error_func_;
+
+ static_assert(std::is_same<DepT, remove_error_or<DepT>>::value,
+ "Should never be of type ErrorOr");
+
+public:
+ convert_conveyor_node(own<conveyor_node> &&dep, Func &&func,
+ ErrorFunc &&error_func)
+ : convert_conveyor_node_base(std::move(dep)), func_{std::move(func)},
+ error_func_{std::move(error_func)} {}
+
+ void get_impl(error_or_value &err_or_val) noexcept override {
+ error_or<unfix_void<DepT>> dep_eov;
+ error_or<unfix_void<remove_error_or<T>>> &eov =
+ err_or_val.as<unfix_void<remove_error_or<T>>>();
+ if (child_mixin_.child) {
+ child_mixin_.child->get_result(dep_eov);
+ if (dep_eov.is_value()) {
+ try {
+
+ eov = fix_void_caller<T, DepT>::apply(
+ func_, std::move(dep_eov.value()));
+ } catch (const std::bad_alloc &) {
+ eov = critical_error("Out of memory");
+ } catch (const std::exception &) {
+ eov = critical_error(
+ "Exception in chain occured. Return ErrorOr<T> if you "
+ "want to handle errors which are recoverable");
+ }
+ } else if (dep_eov.is_error()) {
+ eov = error_func_(std::move(dep_eov.error()));
+ } else {
+ eov = critical_error("No value set in dependency");
+ }
+ } else {
+ eov = critical_error("Conveyor doesn't have child");
+ }
+ }
+};
+
+class sink_conveyor_node final : public conveyor_node,
+ public conveyor_event_storage {
+private:
+ conveyor_node_with_child_mixin child_mixin_;
+ conveyor_sink_set *conveyor_sink_;
+
+public:
+ sink_conveyor_node(own<conveyor_node> node, conveyor_sink_set &conv_sink)
+ : conveyor_event_storage{}, child_mixin_{std::move(node), *this},
+ conveyor_sink_{&conv_sink} {}
+
+ sink_conveyor_node(own<conveyor_node> node)
+ : conveyor_event_storage{}, child_mixin_{std::move(node), *this},
+ conveyor_sink_{nullptr} {}
+
+ // Event only queued if a critical error occured
+ void fire() override {
+ // Queued for destruction of children, because this acts as a sink and
+ // no other event should be here
+ child_mixin_.child = nullptr;
+
+ if (conveyor_sink_) {
+ conveyor_sink_->destroy_sink_conveyor_node(*this);
+ conveyor_sink_ = nullptr;
+ }
+ }
+
+ // ConveyorStorage
+ size_t space() const override { return 1; }
+ size_t queued() const override { return 0; }
+
+ // ConveyorNode
+ void get_result(error_or_value &err_or_val) noexcept override {
+ err_or_val.as<void_t>() =
+ critical_error("In a sink node no result can be returned");
+ }
+
+ error_or<own<conveyor_node>>
+ swap_child(own<conveyor_node> &&swapee) noexcept override {
+ return child_mixin_.swap_child(std::move(swapee));
+ }
+
+ // ConveyorStorage
+ void child_has_fired() override {
+ if (child_mixin_.child) {
+ error_or<void> dep_eov;
+ child_mixin_.child->get_result(dep_eov);
+ if (dep_eov.is_error()) {
+ if (dep_eov.error().is_critical()) {
+ if (!is_armed()) {
+ arm_last();
+ }
+ }
+ if (conveyor_sink_) {
+ conveyor_sink_->fail(std::move(dep_eov.error()));
+ }
+ }
+ }
+ }
+
+ /*
+ * No parent needs to be fired since we always have space
+ */
+ void parent_has_fired() override {}
+
+ conveyor_storage *next_storage() override {
+ // Should never happen though
+ assert(false);
+ return nullptr;
+ // return static_cast<ConveyorStorage*>(this);
+ }
+};
+
+class immediate_conveyor_node_base : public conveyor_node,
+ public conveyor_event_storage {
+private:
+public:
+ immediate_conveyor_node_base();
+
+ virtual ~immediate_conveyor_node_base() = default;
+
+ error_or<own<conveyor_node>>
+ swap_child(own<conveyor_node> &&swapee) noexcept override {
+ (void)swapee;
+ return recoverable_error("Node doesn't support swapping");
+ }
+
+ conveyor_storage *next_storage() noexcept override {
+ return static_cast<conveyor_storage *>(this);
+ }
+};
+
+template <typename T>
+class immediate_conveyor_node final : public immediate_conveyor_node_base {
+private:
+ error_or<fix_void<T>> value_;
+ uint8_t retrieved_;
+
+public:
+ immediate_conveyor_node(fix_void<T> &&val);
+ immediate_conveyor_node(error &&error);
+
+ // ConveyorStorage
+ size_t space() const override;
+ size_t queued() const override;
+
+ void child_has_fired() override;
+ void parent_has_fired() override;
+
+ // ConveyorNode
+ void get_result(error_or_value &err_or_val) noexcept override {
+ if (retrieved_ > 0) {
+ err_or_val.as<fix_void<T>>() =
+ make_error("Already taken value", error::code::Exhausted);
+ } else {
+ err_or_val.as<fix_void<T>>() = std::move(value_);
+ }
+ if (queued() > 0) {
+ ++retrieved_;
+ }
+ }
+
+ // Event
+ void fire() override;
+};
+
+/*
+ * Collects every incoming value and throws it in one lane
+ */
+class merge_conveyor_node_base : public conveyor_node,
+ public conveyor_event_storage {
+public:
+ merge_conveyor_node_base();
+
+ virtual ~merge_conveyor_node_base() = default;
+
+ conveyor_storage *next_storage() noexcept override {
+ return static_cast<conveyor_storage *>(this);
+ }
+};
+
+template <typename T>
+class merge_conveyor_node : public merge_conveyor_node_base {
+private:
+ class appendage final : public conveyor_node, public conveyor_storage {
+ public:
+ own<conveyor_node> child;
+ merge_conveyor_node *merger;
+
+ maybe<error_or<fix_void<T>>> error_or_value_;
+
+ public:
+ appendage(own<conveyor_node> n, merge_conveyor_node &m)
+ : conveyor_storage{}, child{std::move(n)}, merger{&m},
+ error_or_value_{std::nullopt} {}
+
+ bool child_storage_has_element_queued() const {
+ if (!child) {
+ return false;
+ }
+ conveyor_storage *storage = child->next_storage();
+ if (storage) {
+ return storage->queued() > 0;
+ }
+ return false;
+ }
+
+ void get_appendage_result(error_or_value &eov);
+
+ /**
+ * ConveyorNode
+ */
+ error_or<own<conveyor_node>>
+ swap_child(own<conveyor_node> &&swapee_) override;
+
+ conveyor_storage *next_storage() noexcept override {
+ return static_cast<conveyor_storage *>(this);
+ }
+
+ void get_result(error_or_value &err_or_val) override;
+
+ /**
+ * ConveyorStorage
+ */
+ size_t space() const override;
+
+ size_t queued() const override;
+
+ void child_has_fired() override;
+
+ void parent_has_fired() override;
+
+ void set_parent(conveyor_storage *par) override;
+ };
+
+ friend class merge_conveyor_node_data<T>;
+ friend class appendage;
+
+ our<merge_conveyor_node_data<T>> data_;
+ size_t next_appendage_ = 0;
+
+public:
+ merge_conveyor_node(our<merge_conveyor_node_data<T>> data);
+ ~merge_conveyor_node();
+ // ConveyorNode
+ error_or<own<conveyor_node>>
+ swap_child(own<conveyor_node> &&c) noexcept override;
+
+ // Event
+ void get_result(error_or_value &err_or_val) noexcept override;
+
+ void fire() override;
+
+ // ConveyorStorage
+ size_t space() const override;
+ size_t queued() const override;
+ void child_has_fired() override;
+ void parent_has_fired() override;
+};
+
+template <typename T> class merge_conveyor_node_data {
+public:
+ std::vector<own<typename merge_conveyor_node<T>::appendage>> appendages;
+
+ merge_conveyor_node<T> *merger = nullptr;
+
+public:
+ void attach(conveyor<T> conv);
+
+ void governing_node_destroyed();
+};
+
+/*
+class JoinConveyorNodeBase : public ConveyorNode, public ConveyorEventStorage {
+private:
+
+public:
+};
+
+template <typename... Args>
+class JoinConveyorNode final : public JoinConveyorNodeBase {
+private:
+ template<typename T>
+ class Appendage : public ConveyorEventStorage {
+ private:
+ Maybe<T> data = std::nullopt;
+
+ public:
+ size_t space() const override;
+ size_t queued() const override;
+
+ void fire() override;
+ void get_result(ErrorOrValue& eov) override;
+ };
+
+ std::tuple<Appendage<Args>...> appendages;
+
+public:
+};
+
+*/
+
+} // namespace saw
+
+#include "async.tmpl.h"